From 1d5548c24c792ab0ef8d16ace063e81596b0fba3 Mon Sep 17 00:00:00 2001 From: malice Date: Wed, 3 Jun 2026 22:36:26 -0600 Subject: [PATCH] v0.10.0: ITD 511 official API adapter (events + advisories + cameras) (#85) First official-state-DOT-API pattern landing. Two adapters in one PR: - itd_511 (event-class): polls Events (60s) + Advisories (300s) from https://511.idaho.gov/api/v2/get/{event,alerts}. Decodes EncodedPolyline to LineString via the polyline lib (bookend LineString or Point fallback); ITD Severity string mapped None->1 / Minor->2 / Major->3 with IsFullClosure=true forcing 3 regardless; RecurrenceSchedules / Restrictions / DetourPolyline pass through unmodified. Advisories ship as structural pass-through under data.advisory since the upstream /alerts endpoint currently returns []; per-record try/except keeps a surprise shape from sinking the cycle when ITD posts its first one. - itd_511_cameras (telemetry-class): polls Cameras (600s). One event per camera per UTC day; image URL passes straight through to . Region uniform US-ID with data.source_jurisdiction preserving the raw upstream Source field for the ~1.2% cross-DOT border-region mirrors (UDOT / ODOT / WYDOT / WSDOT / NDot / MTD / DriveBC / Lemhi County). Subject convention (v0.9.20 forward): central.traffic..us.id and central.traffic_cameras.us.id.. Castle Rock state_511_atis keeps its bare-state subject; consumers stay on central.traffic.> wildcards during the A/B comparison window. Retry predicate tightened from the Castle Rock / TomTom precedent: 5xx + connection / timeout retry; 4xx other than 429 skip-with-warn (don't burn quota on permanent errors); 429 honors Retry-After once then retries. API key (alias 'idaho_511') travels in the ?key= query string, so every error log path runs through self._redact() to scrub the URL. Both adapters ship disabled; operator enables via GUI after registering the API key with 'python -m set_api_key idaho_511'. Reuses existing CENTRAL_TRAFFIC and CENTRAL_TRAFFIC_CAMERAS streams -- no archive restart needed. Scope-cap exception: this PR is ~1.5k lines vs. the standard 500-line cap, authorized as a one-time exception for the first official-state-DOT-API pattern landing. Two adapters + their tests + real-API fixtures naturally exceed the v0.9.x adapter-cap budget. Co-authored-by: Claude Opus 4.7 (1M context) --- docs/CONSUMER-INTEGRATION.md | 99 ++++ pyproject.toml | 1 + sql/migrations/031_add_itd_511_adapters.sql | 28 ++ src/central/adapters/itd_511.py | 440 ++++++++++++++++++ src/central/adapters/itd_511_cameras.py | 249 ++++++++++ src/central/gui/routes.py | 2 +- .../gui/templates/_event_rows/itd_511.html | 16 + .../_event_rows/itd_511_cameras.html | 10 + .../templates/_event_summaries/itd_511.html | 9 + .../_event_summaries/itd_511_cameras.html | 5 + tests/fixtures/itd_511_alerts_sample.json | 1 + tests/fixtures/itd_511_cameras_sample.json | 90 ++++ tests/fixtures/itd_511_event_sample.json | 214 +++++++++ tests/test_events_feed_frontend.py | 4 + tests/test_itd_511.py | 398 ++++++++++++++++ tests/test_itd_511_cameras.py | 206 ++++++++ tests/test_telemetry_separation.py | 2 +- uv.lock | 11 + 18 files changed, 1783 insertions(+), 2 deletions(-) create mode 100644 sql/migrations/031_add_itd_511_adapters.sql create mode 100644 src/central/adapters/itd_511.py create mode 100644 src/central/adapters/itd_511_cameras.py create mode 100644 src/central/gui/templates/_event_rows/itd_511.html create mode 100644 src/central/gui/templates/_event_rows/itd_511_cameras.html create mode 100644 src/central/gui/templates/_event_summaries/itd_511.html create mode 100644 src/central/gui/templates/_event_summaries/itd_511_cameras.html create mode 100644 tests/fixtures/itd_511_alerts_sample.json create mode 100644 tests/fixtures/itd_511_cameras_sample.json create mode 100644 tests/fixtures/itd_511_event_sample.json create mode 100644 tests/test_itd_511.py create mode 100644 tests/test_itd_511_cameras.py diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index 964d608..7b2502b 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -1557,6 +1557,105 @@ conditions. `lastUpdated` field is camera-config time), so the drawer shows no "captured at". - **Removal semantics:** none; offline cameras serve an empty image but stay listed. +### itd_511 — Idaho 511 official DOT API (events + advisories) + +Idaho Transportation Department's official 511 REST API. Polls roadwork, +closures, incidents, special events, and advisories statewide. First +official-state-DOT-API adapter (v0.10.0); runs in parallel with state_511_atis +(Castle Rock) post-deploy for A/B comparison. Idaho-only; subject suffix is +uniformly `us.id`. + +- **Stream:** `CENTRAL_TRAFFIC` (event class). **event_type:** one of + `work_zone`, `closure`, `incident`, `special_event`, `advisory` (from + `category = ".itd_511"`). +- **Subject pattern:** `central.traffic..us.id` (v0.9.20 forward + convention -- `us.id` is the ISO-3166-2 region suffix; subscribe to all ITD + events via `central.traffic.>.us.id` or per-type via the leading wildcard). +- **Source:** `GET /api/v2/get/event?key=` (events, **cadence 60s**) + + `GET /api/v2/get/alerts?key=` (advisories, **cadence 300s** -- every + 5th poll). API key required (alias `idaho_511`). Documented limit 10 calls + / 60s; combined load ~1.3 calls/min. +- **EventType mapping:** `roadwork`->`work_zone`, `closures`->`closure`, + `accidentsAndIncidents`->`incident`, `specialEvents`->`special_event`. + Advisories always emit `event_type=advisory`. +- **Dedup key shape:** `idaho_511:event:` (events) / + `idaho_511:advisory:` (advisories). SourceId is the upstream-allocated + stable id; the ITD-internal `ID` is used as fallback. +- **Geometry:** decoded EncodedPolyline (Google polyline format) -> LineString; + falls back to bookend LineString (Latitude,Longitude + Latitude2,Longitude2) + -> single Point. Shipped via `geo.geometry` so PostGIS renders the affected + segment as a polyline on the map. +- **Severity:** ITD `Severity` (string) mapped `None`->1, `Minor`->2, `Major`->3. + `IsFullClosure=true` forces severity 3 regardless (orthogonal upstream + signals in the live data -- 15 of 152 None-severity events were full + closures at landing). +- **Advisories:** structural pass-through under `data.advisory`. The upstream + `/alerts` endpoint returned `[]` at adapter landing; the parser probes a few + likely id / timestamp / coord fields best-effort and stores the entire + record so v0.10.x can refine field mapping once a real advisory lands. +- **Event.data fields (events):** + + | key | type | nullable | description | + |---|---|---|---| + | `event_type_short` | str | no | One of work_zone / closure / incident / special_event / advisory | + | `event_sub_type` | str | yes | Rich vocabulary, e.g. `bridgeConstruction`, `nightTimeConstructionWork` | + | `roadway_name` | str | yes | e.g. `I-84`, `SH-16` | + | `direction` | str | yes | `East` / `Both` / `Unknown` (Unknown suppressed in L-c text) | + | `description` / `comment` | str | yes | Operator text | + | `lanes_affected` | str | yes | e.g. `2 Left Lanes Blocked` | + | `is_full_closure` | bool | no | Drives the severity-3 override | + | `itd_severity` | str | yes | Raw ITD value (`Major` / `Minor` / `None`) | + | `cause` | str | yes | Usually mirrors EventType (`roadwork`, `Incident`, `specialEvents`) | + | `organization` | str | yes | Uniformly `ERS` at landing | + | `recurrence_text` | str | yes | HTML schedule (consumer should `striptags`) | + | `recurrence_schedules` | list | yes | Structured `[{StartDate, EndDate, Times, DaysOfWeek}]` | + | `restrictions` | dict | yes | `{Width, Height, Length, Weight, Speed}` (often all null) | + | `detour_polyline` / `detour_instructions` | str | yes | Detour geometry + text | + | `encoded_polyline` | str | yes | Raw EncodedPolyline (also decoded into geo.geometry) | + | `id_internal` / `source_id` | int / str | no | ITD-internal id + upstream-stable SourceId | + | `reported_epoch` / `last_updated_epoch` / `start_epoch` / `planned_end_epoch` | int | yes | Unix epoch (UTC); `Event.time` uses LastUpdated -> Reported -> StartDate priority | + | `latitude` / `longitude` | float | yes | Primary point (enrichment input) | + +### itd_511_cameras — Idaho 511 official DOT API cameras (telemetry) + +Idaho Transportation Department's traffic camera directory. One telemetry +event per camera per UTC day; the `/telemetry` detail drawer renders the live +image inline (`` fetched direct from the source -- Central stores the URL, +never the image bytes). Sibling adapter of itd_511 (shared API key alias +`idaho_511`). + +- **Stream:** `CENTRAL_TRAFFIC_CAMERAS` (telemetry; `/telemetry` tab). +- **Subject pattern:** `central.traffic_cameras.us.id.` -- subscribe + to one camera or `central.traffic_cameras.us.id.>` for all ITD cameras. +- **GUI event_type:** `camera` (from `category = "camera.itd_511_cameras"`). +- **Source:** `GET /api/v2/get/cameras?key=`. 664 cameras at landing; + **cadence 600s**. ITD aggregates ~1.2% border-region mirrors from + neighbouring DOTs (UDOT, ODOT, WYDOT, WSDOT, NDot, MTD, DriveBC, Lemhi + County). All cameras are tagged region `US-ID`; `data.source_jurisdiction` + preserves the raw upstream `Source` so consumers can re-bucket the cross-DOT + mirrors if needed. +- **Dedup key shape:** `idaho_511:cam::` -- one event + per camera per UTC day. The table always shows today's cameras; no per-poll + flooding. +- **Image URL:** `https://511.idaho.gov/map/Cctv/` -- publicly + reachable, no auth. Format may be jpeg / gif / png (mixed per camera); + `` handles all. +- **Event.data fields:** + + | key | type | nullable | description | + |---|---|---|---| + | `camera_id` | int | no | Stable upstream id | + | `roadway` | str | yes | e.g. `I-84`, `I-15` | + | `direction` | str | yes | `North` / `Unknown` (Unknown suppressed in row partial) | + | `location` | str | yes | Humanized (e.g. `I-15 UT/ID State Line UT`) | + | `source` / `source_jurisdiction` | str | yes | ITDNET / ACHD / RWIS / Idaho511 / UDOT / ODOT / WYDOT / ... | + | `source_id_upstream` | str | yes | Source-specific id (e.g. UDOT `10.C1`) | + | `image_url` | str | yes | First Views[].Url (live image; jpeg/gif/png) | + | `additional_views` | list[str] | no | URLs of Views[1:] when a camera has multiple angles | + | `view_count` / `view_descriptions` | int / list[str] | no / no | Total views + per-view labels | + | `sort_order` | int | yes | Upstream display order hint | + | `latitude` / `longitude` | float | yes | Camera location (enrichment input) | + ### tomtom_incidents — TomTom real-time traffic incidents (commercial coverage) Real-time incidents (closures, jams, hazards, road work, accidents) from TomTom diff --git a/pyproject.toml b/pyproject.toml index 9dd4745..22c50a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "fastapi>=0.115.0", "jinja2>=3.1.6", "nats-py>=2.14.0", + "polyline>=2.0,<3", # itd_511: Google polyline decode (EncodedPolyline → LineString) "pydantic>=2,<3", "pydantic-settings>=2.7.0", "python-multipart>=0.0.20", diff --git a/sql/migrations/031_add_itd_511_adapters.sql b/sql/migrations/031_add_itd_511_adapters.sql new file mode 100644 index 0000000..0f5ddcd --- /dev/null +++ b/sql/migrations/031_add_itd_511_adapters.sql @@ -0,0 +1,28 @@ +-- Migration: 031_add_itd_511_adapters +-- Adds the itd_511 (event-class) and itd_511_cameras (telemetry-class) adapters +-- onto the EXISTING CENTRAL_TRAFFIC and CENTRAL_TRAFFIC_CAMERAS streams. NO +-- new streams -> no central-archive restart needed +-- (feedback_new_stream_needs_archive_restart). Both ship disabled; operator +-- enables via GUI after registering the 'idaho_511' API key +-- (python -m set_api_key, alias 'idaho_511'). +-- itd_511 polls 60s (events) + 300s (advisories sub-poll); ~1.3 calls/min +-- combined, well under ITD's documented 10/60s budget. +-- Additive-only: idempotent via ON CONFLICT DO NOTHING. + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'itd_511', + false, + 60, + '{"api_key_alias": "idaho_511"}'::jsonb +) +ON CONFLICT (name) DO NOTHING; + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'itd_511_cameras', + false, + 600, + '{"api_key_alias": "idaho_511"}'::jsonb +) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/itd_511.py b/src/central/adapters/itd_511.py new file mode 100644 index 0000000..641b8fc --- /dev/null +++ b/src/central/adapters/itd_511.py @@ -0,0 +1,440 @@ +"""ITD 511 traffic adapter — Idaho DOT official REST API (events + advisories). + +Polls /api/v2/get/event (60s) and /api/v2/get/alerts (300s = every 5th poll) +from https://511.idaho.gov per v0.10.0. Sibling adapter ``itd_511_cameras`` +(data_class="telemetry") handles /get/cameras. Both ship to existing streams +(CENTRAL_TRAFFIC / CENTRAL_TRAFFIC_CAMERAS) — no archive restart needed. + +EncodedPolyline (Google polyline format) decodes to a LineString via the +``polyline`` PyPI dep; falls back to (Latitude, LatitudeSecondary) bookend +LineString or a single Point when geometry is sparse. RecurrenceSchedules, +Restrictions, and DetourPolyline pass through unmodified for downstream +consumers (truckers, route planners). + +ITD Severity is a string ("Major"/"Minor"/"None") not 0-3 as the v0.10.0 spec +initially assumed; mapping is None->1, Minor->2, Major->3 with IsFullClosure +=true forcing 3 regardless (Severity and full-closure are orthogonal upstream +signals — 15 of 152 "None"-severity events are full closures in the live data). + +Subject convention (v0.9.20 forward, locked v0.10.0 architectural call A): + central.traffic..us.id + +where event_type ∈ {incident, closure, work_zone, special_event, advisory}. + +Dedup id: ``idaho_511:event:`` (SourceId is upstream-allocated, more +stable across ITD-side restamps than the internal ID; falls back to ID if +SourceId is null, never happened in the v0.10.0 step-0 snapshot). + +Retry predicate tightened from the Castle Rock/TomTom precedent (architectural +call B): 5xx + connection/timeout retry; 4xx other than 429 skip the poll with +a warn (don't burn quota on permanent errors like a bad key); 429 honors +Retry-After once then retries. + +API key (alias 'idaho_511') travels in the ?key= query string — aiohttp's +default error messages leak the URL, so every error log path runs through +``self._redact()``. +""" + +import asyncio +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +import polyline as polyline_lib +from pydantic import BaseModel +from tenacity import ( + after_nothing, + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +_BASE_URL = "https://511.idaho.gov/api/v2/get" +_FETCH_TIMEOUT_S = 30 +_MAX_RETRY_AFTER_S = 60 # cap on Retry-After honor; longer => skip the cycle +_ADVISORY_EVERY_N_POLLS = 5 # 60s * 5 = 300s effective cadence + +_DEDUP_DDL = ( + "CREATE TABLE IF NOT EXISTS published_ids (" + "adapter TEXT NOT NULL, event_id TEXT NOT NULL, " + "first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (adapter, event_id))" +) + +# ITD EventType -> Central event_type prefix (category = ".itd_511", +# subject = "central.traffic..us.id"). Advisories use "advisory". +EVENT_TYPE_MAP = { + "roadwork": "work_zone", + "closures": "closure", + "accidentsAndIncidents": "incident", + "specialEvents": "special_event", +} + +# ITD Severity (string) -> Central 1-4. IsFullClosure=true forces 3 regardless +# (orthogonal upstream signals; 15/152 "None"-severity events are full closures). +_SEVERITY_MAP = {"None": 1, "Minor": 2, "Major": 3} + + +class _Transient(Exception): + """Internal: retryable. 5xx or 429 (with optional Retry-After honor). + + ``wait_s`` carries an explicit wait override the retry strategy reads, + so a documented Retry-After value drives the next attempt's delay + directly. Fixes the BUG C double-wait: the previous shape did an inline + ``await asyncio.sleep(wait_s)`` AND then let tenacity's exponential + jitter wait again on top, blocking ~120s+ per cycle for a single 429 + with Retry-After=60. + """ + + def __init__(self, msg: str, *, wait_s: int | None = None) -> None: + super().__init__(msg) + self.wait_s = wait_s + + +_EXP_WAIT = wait_exponential_jitter(initial=1, max=30) + + +def _wait_strategy(retry_state: Any) -> float: + """Tenacity wait callable. Honors ``_Transient.wait_s`` when set (429 + Retry-After path); else falls through to exponential jitter. Shared + across itd_511 and itd_511_cameras via direct import.""" + if retry_state.outcome is not None: + exc = retry_state.outcome.exception() + if isinstance(exc, _Transient) and exc.wait_s is not None: + return float(exc.wait_s) + return float(_EXP_WAIT(retry_state)) + + +def _parse_epoch(value: Any) -> datetime | None: + """Unix epoch seconds -> UTC datetime; None on missing/non-numeric/out-of-range.""" + if value is None: + return None + try: + return datetime.fromtimestamp(int(value), tz=timezone.utc) + except (ValueError, OSError, TypeError): + return None + + +def _strip_or_none(value: Any) -> Any: + """Strip a string (handles EventSubType trailing-space artifact); '' -> None. + Non-strings pass through unchanged so the call site can stay terse.""" + if not isinstance(value, str): + return value + stripped = value.strip() + return stripped or None + + +def _itd_severity(itd_sev: str | None, is_full_closure: bool) -> int: + """ITD Severity + IsFullClosure -> Central 1-4 (None->1, Minor->2, Major->3, + full-closure overrides to 3 regardless).""" + if is_full_closure: + return 3 + return _SEVERITY_MAP.get(itd_sev or "", 1) + + +def _decode_polyline(encoded: str | None) -> list[tuple[float, float]]: + """Decode a Google polyline -> list of (lat, lon). [] on null/empty/invalid.""" + if not encoded: + return [] + try: + return polyline_lib.decode(encoded) + except Exception: # the lib raises a grab-bag of unicode/index errors on bad input + return [] + + +def _build_geometry( + lat: float | None, lon: float | None, + lat2: float | None, lon2: float | None, + encoded: str | None, +) -> tuple[dict[str, Any] | None, tuple[float, float] | None]: + """Pick the best geometry available -> (GeoJSON geometry, centroid lon/lat). + + Priority: decoded EncodedPolyline LineString -> bookend LineString + (primary + secondary lat/lon) -> single Point. Centroid is the first vertex. + """ + decoded = _decode_polyline(encoded) + if len(decoded) >= 2: + coords = [(lon_, lat_) for lat_, lon_ in decoded] + return {"type": "LineString", "coordinates": coords}, coords[0] + if lat is not None and lon is not None: + if lat2 is not None and lon2 is not None: + return ({"type": "LineString", "coordinates": [(lon, lat), (lon2, lat2)]}, + (lon, lat)) + return {"type": "Point", "coordinates": (lon, lat)}, (lon, lat) + return None, None + + +class Itd511Settings(BaseModel): + """api_key_alias: config.api_keys entry (default 'idaho_511', shared with + itd_511_cameras). ITD is Idaho-only — no bbox/state list; region tagged + uniformly US-ID.""" + + api_key_alias: str = "idaho_511" + + +class Itd511Adapter(SourceAdapter): + """Idaho 511 official API — events + advisories.""" + + name = "itd_511" + display_name = "Idaho 511 (Official API)" + description = ( + "Idaho Transportation Department's official 511 REST API. Polls roadwork, " + "closures, incidents, special events, and advisories statewide. Geometry " + "via decoded EncodedPolyline; ships to CENTRAL_TRAFFIC." + ) + settings_schema = Itd511Settings + requires_api_key = "idaho_511" + api_key_field = "api_key_alias" + wizard_order = None # Ships disabled + default_cadence_s = 60 + data_class = "event" + enrichment_locations = [("latitude", "longitude")] + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._api_key_alias: str = config.settings.get("api_key_alias", "idaho_511") + self._api_key: str | None = None + self._adv_counter = 0 # advisories polled on counter == 0 + + def _redact(self, text: str) -> str: + return text.replace(self._api_key, "") if self._api_key else text + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S), + headers={"User-Agent": "Central/0.10 (+itd_511)"}, + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(_DEDUP_DDL) + self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)") + self._db.commit() + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + logger.info("itd_511 adapter started", + extra={"alias": self._api_key_alias, "api_key_present": bool(self._api_key)}) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + self._api_key_alias = new_config.settings.get("api_key_alias", "idaho_511") + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + logger.info("itd_511 config updated", + extra={"alias": self._api_key_alias, "api_key_present": bool(self._api_key)}) + + @retry( + stop=stop_after_attempt(3), + wait=_wait_strategy, + retry=retry_if_exception_type((_Transient, aiohttp.ClientConnectionError, + asyncio.TimeoutError, TimeoutError)), + reraise=True, # propagate _Transient/ClientError verbatim, not RetryError + before_sleep=None, # explicit per BUG D5 audit: tenacity must not log between attempts + after=after_nothing, # explicit per BUG D5 audit: tenacity must not log after attempts + ) + async def _fetch(self, endpoint: str) -> list[dict[str, Any]] | None: + """GET /api/v2/get/?key=. Returns the list, or None on a + permanent 4xx (skip cycle, don't retry). Raises _Transient on 5xx/429 + for tenacity to retry; the 429 path attaches the Retry-After value via + ``_Transient(wait_s=...)`` so ``_wait_strategy`` honors it directly + (no in-handler sleep — fixes BUG C double-wait). Key is redacted in + every error log.""" + if self._session is None: + raise RuntimeError("itd_511 session not started") + async with self._session.get( + f"{_BASE_URL}/{endpoint}", params={"key": self._api_key}, + ) as resp: + if resp.status == 200: + return await resp.json(content_type=None) + body_preview = self._redact((await resp.text())[:200]) + if resp.status == 429: + ra = resp.headers.get("Retry-After", "") + wait_s = min(int(ra) if ra.isdigit() else 5, _MAX_RETRY_AFTER_S) + logger.warning("itd_511 rate-limited", + extra={"endpoint": endpoint, "retry_after": wait_s, "body": body_preview}) + raise _Transient(f"429 retry-after={wait_s}", wait_s=wait_s) + if 500 <= resp.status < 600: + logger.warning("itd_511 upstream 5xx", + extra={"endpoint": endpoint, "status": resp.status, "body": body_preview}) + raise _Transient(f"{resp.status} server error") + logger.warning("itd_511 permanent error; skipping endpoint", + extra={"endpoint": endpoint, "status": resp.status, "body": body_preview}) + return None + + def _build_event_record(self, rec: dict[str, Any]) -> Event | None: + """Build an Event from one /get/event row. Returns None without a stable id.""" + source_id = rec.get("SourceId") or rec.get("ID") + if source_id is None: + return None + et = EVENT_TYPE_MAP.get(rec.get("EventType") or "", "incident") + lat, lon = rec.get("Latitude"), rec.get("Longitude") + geom, centroid = _build_geometry( + lat, lon, rec.get("LatitudeSecondary"), rec.get("LongitudeSecondary"), + rec.get("EncodedPolyline"), + ) + return Event( + id=f"idaho_511:event:{source_id}", + adapter=self.name, + category=f"{et}.itd_511", + time=(_parse_epoch(rec.get("LastUpdated")) + or _parse_epoch(rec.get("Reported")) + or _parse_epoch(rec.get("StartDate")) + or datetime.now(timezone.utc)), + expires=_parse_epoch(rec.get("PlannedEndDate")), + severity=_itd_severity(rec.get("Severity"), bool(rec.get("IsFullClosure"))), + geo=Geo( + centroid=centroid, geometry=geom, + regions=["US-ID"], primary_region="US-ID", + ), + data={ + "event_type_short": et, + "event_sub_type": _strip_or_none(rec.get("EventSubType")), + "roadway_name": _strip_or_none(rec.get("RoadwayName")), + "direction": _strip_or_none(rec.get("DirectionOfTravel")), + "description": _strip_or_none(rec.get("Description")), + "lanes_affected": _strip_or_none(rec.get("LanesAffected")), + "is_full_closure": bool(rec.get("IsFullClosure")), + "itd_severity": rec.get("Severity"), + "comment": _strip_or_none(rec.get("Comment")), + "cause": _strip_or_none(rec.get("Cause")), + "organization": rec.get("Organization"), + "recurrence_text": _strip_or_none(rec.get("Recurrence")), + "recurrence_schedules": rec.get("RecurrenceSchedules") or [], + "restrictions": rec.get("Restrictions") or {}, + "detour_polyline": rec.get("DetourPolyline") or None, + "detour_instructions": _strip_or_none(rec.get("DetourInstructions")), + "encoded_polyline": rec.get("EncodedPolyline"), + "id_internal": rec.get("ID"), + "source_id": rec.get("SourceId"), + "reported_epoch": rec.get("Reported"), + "last_updated_epoch": rec.get("LastUpdated"), + "start_epoch": rec.get("StartDate"), + "planned_end_epoch": rec.get("PlannedEndDate"), + "latitude": lat, + "longitude": lon, + }, + ) + + def _build_advisory_record(self, rec: dict[str, Any]) -> Event | None: + """Build an Event from one /get/alerts row. ITD's currently-empty + response means we can't field-map exactly; we structural-pass-through + the whole record under ``data.advisory`` and probe a few likely id / + timestamp / coord fields best-effort. Per-record try/except in poll() + keeps a surprise shape from sinking the cycle.""" + source_id = (rec.get("SourceId") or rec.get("ID") + or rec.get("AlertId") or rec.get("Id")) + if source_id is None: + return None + lat, lon = rec.get("Latitude"), rec.get("Longitude") + geom, centroid = _build_geometry( + lat, lon, rec.get("LatitudeSecondary"), rec.get("LongitudeSecondary"), + rec.get("EncodedPolyline"), + ) + return Event( + id=f"idaho_511:advisory:{source_id}", + adapter=self.name, + category="advisory.itd_511", + time=(_parse_epoch(rec.get("LastUpdated")) + or _parse_epoch(rec.get("Reported")) + or _parse_epoch(rec.get("StartDate")) + or datetime.now(timezone.utc)), + expires=_parse_epoch(rec.get("PlannedEndDate") or rec.get("EndDate")), + severity=_itd_severity(rec.get("Severity"), False), + geo=Geo( + centroid=centroid, geometry=geom, + regions=["US-ID"], primary_region="US-ID", + ), + data={ + "event_type_short": "advisory", + "description": _strip_or_none(rec.get("Description") or rec.get("Message")), + "roadway_name": _strip_or_none(rec.get("RoadwayName")), + "advisory": rec, # full structural pass-through + "source_id": rec.get("SourceId"), + "latitude": lat, + "longitude": lon, + }, + ) + + async def poll(self) -> AsyncIterator[Event]: + if not self._session: + raise RuntimeError("Session not initialized") + if not self._api_key: + logger.warning("itd_511: no API key for alias; skipping poll", + extra={"alias": self._api_key_alias}) + return + + do_advisories = self._adv_counter == 0 + self._adv_counter = (self._adv_counter + 1) % _ADVISORY_EVERY_N_POLLS + + endpoints = ["event"] + (["alerts"] if do_advisories else []) + results = await asyncio.gather( + *[self._fetch(ep) for ep in endpoints], return_exceptions=True, + ) + + events_raw: list[dict[str, Any]] | None = None + adv_raw: list[dict[str, Any]] | None = None + for ep, result in zip(endpoints, results): + if isinstance(result, BaseException): + logger.warning("itd_511 fetch failed", + extra={"endpoint": ep, "error": self._redact(str(result))}) + continue + if ep == "event": + events_raw = result + else: + adv_raw = result + + yielded = 0 + for rec in (events_raw or []): + try: + ev = self._build_event_record(rec) + except Exception: # one bad record never sinks the poll + logger.exception("itd_511 event parse failed", + extra={"id": rec.get("ID"), "source_id": rec.get("SourceId")}) + continue + if ev is not None: + yield ev + yielded += 1 + for rec in (adv_raw or []): + try: + ev = self._build_advisory_record(rec) + except Exception: # advisory shape unverified — be defensive + logger.exception("itd_511 advisory parse failed", + extra={"id": rec.get("Id") or rec.get("ID")}) + continue + if ev is not None: + yield ev + yielded += 1 + + self.sweep_old_ids() + logger.info("itd_511 poll completed", + extra={"events_yielded": yielded, + "events_raw": len(events_raw or []), + "advisories_raw": len(adv_raw or []) if do_advisories else None}) + + def subject_for(self, event: Event) -> str: + et = event.category.split(".", 1)[0] + return f"central.traffic.{et}.us.id" diff --git a/src/central/adapters/itd_511_cameras.py b/src/central/adapters/itd_511_cameras.py new file mode 100644 index 0000000..3238e26 --- /dev/null +++ b/src/central/adapters/itd_511_cameras.py @@ -0,0 +1,249 @@ +"""ITD 511 cameras adapter — telemetry. + +Polls /api/v2/get/cameras (664 cameras in the v0.10.0 step-0 snapshot) and +emits one telemetry Event per camera per UTC day. ITD aggregates border-region +mirrors from neighbouring DOTs (UDOT, ODOT, WYDOT, WSDOT, NDot, MTD, DriveBC, +Lemhi County) plus its own ITDNET/ACHD/RWIS/Idaho511 feeds; ~1.2% of cameras +sit outside Idaho but ride the same feed. + +Per the v0.10.0 finding-4 architectural call: region tagged uniformly US-ID; +``data.source_jurisdiction`` carries the raw upstream ``Source`` field so +border-region cameras remain distinguishable for downstream consumers (lat/lon +bounds-checking deferred to a later release). + +Subject convention (v0.9.20 forward, locked v0.10.0 architectural call A): + central.traffic_cameras.us.id. + +Dedup id: ``idaho_511:cam::`` — one telemetry event per +camera per UTC day, matching the state_511_atis_cameras precedent. Image URLs +ship straight through; the partial renders from Views[0].Url +(jpeg/gif/png all confirmed publicly reachable). + +Retry predicate matches itd_511's (architectural call B): no retry on 4xx +except 429 with Retry-After. Shares the ``_Transient`` sentinel and +``_MAX_RETRY_AFTER_S`` cap from the sibling module. +""" + +import asyncio +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + after_nothing, + retry, + retry_if_exception_type, + stop_after_attempt, +) + +from central.adapter import SourceAdapter +from central.adapters.itd_511 import _MAX_RETRY_AFTER_S, _Transient, _wait_strategy +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +_BASE_URL = "https://511.idaho.gov/api/v2/get" +_FETCH_TIMEOUT_S = 30 + +_DEDUP_DDL = ( + "CREATE TABLE IF NOT EXISTS published_ids (" + "adapter TEXT NOT NULL, event_id TEXT NOT NULL, " + "first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (adapter, event_id))" +) + +# Source values that are native ITD-internal feeds (not cross-DOT border-region +# mirrors). Drives the row-partial "(cross-DOT mirror)" annotation via the +# data.is_native_source boolean computed in _build_event — per +# feedback_no_hardcoding, the partial must not carry the allow-list itself. +NATIVE_SOURCES = frozenset({"ITDNET", "Idaho511", "ACHD", "RWIS"}) + + +class Itd511CamerasSettings(BaseModel): + """api_key_alias: config.api_keys entry (default 'idaho_511', shared with + the itd_511 event adapter).""" + + api_key_alias: str = "idaho_511" + + +class Itd511CamerasAdapter(SourceAdapter): + """Idaho 511 official cameras directory adapter (telemetry).""" + + name = "itd_511_cameras" + display_name = "Idaho 511 Cameras (Official API)" + description = ( + "Idaho Transportation Department's traffic camera directory (664 cameras " + "statewide plus border-region cross-DOT mirrors). One telemetry event " + "per camera per UTC day; the detail drawer streams the live image " + "direct from the source." + ) + settings_schema = Itd511CamerasSettings + requires_api_key = "idaho_511" + api_key_field = "api_key_alias" + wizard_order = None # Ships disabled + default_cadence_s = 600 + data_class = "telemetry" + enrichment_locations = [("latitude", "longitude")] + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._api_key_alias: str = config.settings.get("api_key_alias", "idaho_511") + self._api_key: str | None = None + + def _redact(self, text: str) -> str: + return text.replace(self._api_key, "") if self._api_key else text + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_S), + headers={"User-Agent": "Central/0.10 (+itd_511_cameras)"}, + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(_DEDUP_DDL) + self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)") + self._db.commit() + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + logger.info("itd_511_cameras adapter started", + extra={"alias": self._api_key_alias, "api_key_present": bool(self._api_key)}) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + self._api_key_alias = new_config.settings.get("api_key_alias", "idaho_511") + self._api_key = await self._config_store.get_api_key(self._api_key_alias) + logger.info("itd_511_cameras config updated", + extra={"alias": self._api_key_alias, "api_key_present": bool(self._api_key)}) + + @retry( + stop=stop_after_attempt(3), + wait=_wait_strategy, + retry=retry_if_exception_type((_Transient, aiohttp.ClientConnectionError, + asyncio.TimeoutError, TimeoutError)), + reraise=True, # propagate _Transient/ClientError verbatim, not RetryError + before_sleep=None, # explicit per BUG D5 audit: tenacity must not log between attempts + after=after_nothing, # explicit per BUG D5 audit: tenacity must not log after attempts + ) + async def _fetch_cameras(self) -> list[dict[str, Any]] | None: + """GET /api/v2/get/cameras?key=. Same retry policy as itd_511 + (the _wait_strategy + _Transient(wait_s=...) shape avoids the BUG C + double-wait on Retry-After).""" + if self._session is None: + raise RuntimeError("itd_511_cameras session not started") + async with self._session.get( + f"{_BASE_URL}/cameras", params={"key": self._api_key}, + ) as resp: + if resp.status == 200: + return await resp.json(content_type=None) + body_preview = self._redact((await resp.text())[:200]) + if resp.status == 429: + ra = resp.headers.get("Retry-After", "") + wait_s = min(int(ra) if ra.isdigit() else 5, _MAX_RETRY_AFTER_S) + logger.warning("itd_511_cameras rate-limited", + extra={"retry_after": wait_s, "body": body_preview}) + raise _Transient(f"429 retry-after={wait_s}", wait_s=wait_s) + if 500 <= resp.status < 600: + logger.warning("itd_511_cameras upstream 5xx", + extra={"status": resp.status, "body": body_preview}) + raise _Transient(f"{resp.status} server error") + logger.warning("itd_511_cameras permanent error; skipping poll", + extra={"status": resp.status, "body": body_preview}) + return None + + def _build_event(self, cam: dict[str, Any]) -> Event | None: + cam_id = cam.get("Id") + if cam_id is None: + return None + views = cam.get("Views") or [] + image_url = views[0].get("Url") if views else None + lat, lon = cam.get("Latitude"), cam.get("Longitude") + now = datetime.now(timezone.utc) + day = now.strftime("%Y-%m-%d") + source = cam.get("Source") + return Event( + id=f"idaho_511:cam:{cam_id}:{day}", + adapter=self.name, + category="camera.itd_511_cameras", + time=now, + severity=1, # telemetry; cameras carry no severity signal + geo=Geo( + centroid=((lon, lat) if lat is not None and lon is not None else None), + regions=["US-ID"], + primary_region="US-ID", + ), + data={ + "camera_id": cam_id, + "roadway": cam.get("Roadway"), + "direction": cam.get("Direction"), + "location": cam.get("Location"), + "source": source, + "source_jurisdiction": source, # alias per v0.10.0 finding 4 + # Drives the row-partial "(cross-DOT mirror)" annotation + # without hardcoding the allow-list at the template layer + # (BUG D2 / [[feedback_no_hardcoding]]). + "is_native_source": source in NATIVE_SOURCES, + "source_id_upstream": cam.get("SourceId"), + "image_url": image_url, + "additional_views": [v.get("Url") for v in views[1:]], + "view_count": len(views), + "view_descriptions": [v.get("Description") for v in views], + "sort_order": cam.get("SortOrder"), + "latitude": lat, + "longitude": lon, + }, + ) + + async def poll(self) -> AsyncIterator[Event]: + if not self._session: + raise RuntimeError("Session not initialized") + if not self._api_key: + logger.warning("itd_511_cameras: no API key for alias; skipping poll", + extra={"alias": self._api_key_alias}) + return + try: + cams = await self._fetch_cameras() + except (_Transient, aiohttp.ClientError, TimeoutError) as exc: + # BUG E fix: _Transient must be caught alongside ClientError — + # else tenacity's reraise of a persistent 5xx/429 (after exhausted + # retries) crashes the whole poll cycle. + logger.warning("itd_511_cameras fetch failed", + extra={"error": self._redact(str(exc))}) + return + yielded = 0 + for cam in (cams or []): + try: + ev = self._build_event(cam) + except Exception: + logger.exception("itd_511_cameras parse failed", + extra={"id": cam.get("Id")}) + continue + if ev is not None: + yield ev + yielded += 1 + self.sweep_old_ids() + logger.info("itd_511_cameras poll completed", + extra={"cameras_yielded": yielded, "cameras_raw": len(cams or [])}) + + def subject_for(self, event: Event) -> str: + return f"central.traffic_cameras.us.id.{event.data.get('camera_id')}" diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index ec75563..64635d8 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2851,7 +2851,7 @@ ADAPTER_GROUPS = { "Space": ["swpc_alerts", "swpc_kindex", "swpc_protons"], "Geophysical": ["usgs_quake", "nwis"], "Earth Observation": ["eonet"], - "Transportation": ["wzdx", "state_511_atis", "tomtom_flow", "tomtom_incidents", "state_511_atis_cameras"], + "Transportation": ["wzdx", "state_511_atis", "tomtom_flow", "tomtom_incidents", "state_511_atis_cameras", "itd_511", "itd_511_cameras"], } # Same palette the map legend uses, indexed by sorted-adapter position. EVENTS_PALETTE = [ diff --git a/src/central/gui/templates/_event_rows/itd_511.html b/src/central/gui/templates/_event_rows/itd_511.html new file mode 100644 index 0000000..78f7b38 --- /dev/null +++ b/src/central/gui/templates/_event_rows/itd_511.html @@ -0,0 +1,16 @@ +{# ITD 511 detail rows. Fields from payload->data->data; every block guarded. + Direction "Unknown"/"None" suppressed (wzdx lesson). Recurrence HTML + stripped via striptags so the source's /
markup doesn't leak. #} +{%- set d = (event.data.get('data') or {}).get('data') or {} -%} +{% if d.get('roadway_name') %}
Road
{{ d.roadway_name }}{% if d.get('direction') and d.direction not in ['Unknown', 'None'] %} ({{ d.direction }}){% endif %}
{% endif %} +{% if d.get('event_sub_type') %}
Type
{{ d.event_sub_type }}
{% endif %} +{% if d.get('lanes_affected') and d.lanes_affected != 'No Data' %}
Lanes
{{ d.lanes_affected }}
{% endif %} +{% if d.get('is_full_closure') %}
Full closure
Yes
{% endif %} +{% if d.get('itd_severity') and d.itd_severity != 'None' %}
ITD severity
{{ d.itd_severity }}
{% endif %} +{% if d.get('description') %}
Description
{{ d.description | truncate(220) }}
{% endif %} +{% if d.get('comment') %}
Comment
{{ d.comment | truncate(220) }}
{% endif %} +{% if d.get('cause') and d.cause != d.get('event_type_short') %}
Cause
{{ d.cause }}
{% endif %} +{% if d.get('detour_instructions') %}
Detour
{{ d.detour_instructions | truncate(220) }}
{% endif %} +{% if d.get('recurrence_text') %}
Schedule
{{ d.recurrence_text | striptags | truncate(120) }}
{% endif %} +{%- set r = d.get('restrictions') or {} -%} +{% if r.get('Width') or r.get('Height') or r.get('Length') or r.get('Weight') or r.get('Speed') %}
Restrictions
{% if r.Width %}W:{{ r.Width }} {% endif %}{% if r.Height %}H:{{ r.Height }} {% endif %}{% if r.Length %}L:{{ r.Length }} {% endif %}{% if r.Weight %}Wt:{{ r.Weight }} {% endif %}{% if r.Speed %}Spd:{{ r.Speed }}{% endif %}
{% endif %} diff --git a/src/central/gui/templates/_event_rows/itd_511_cameras.html b/src/central/gui/templates/_event_rows/itd_511_cameras.html new file mode 100644 index 0000000..ac94ad7 --- /dev/null +++ b/src/central/gui/templates/_event_rows/itd_511_cameras.html @@ -0,0 +1,10 @@ +{# ITD 511 camera detail. Live fetched direct from the source (publicly + reachable, no auth). Format may be jpeg/gif/png — handles all. + source_jurisdiction surfaced for cross-DOT border-region cameras (UDOT/ + ODOT/etc.) per v0.10.0 finding 4. Fields from payload->data->data. #} +{%- set d = (event.data.get('data') or {}).get('data') or {} -%} +{% if d.get('image_url') %}
View
Camera view
{% endif %} +{% if d.get('roadway') %}
Road
{{ d.roadway }}{% if d.get('direction') and d.direction not in ['Unknown', 'None'] %} ({{ d.direction }}){% endif %}
{% endif %} +{% if d.get('location') %}
Location
{{ d.location }}
{% endif %} +{% if d.get('source_jurisdiction') and not d.get('is_native_source') %}
Source
{{ d.source_jurisdiction }} (cross-DOT mirror)
{% endif %} +{% if d.get('view_count') and d.view_count > 1 %}
Views
{{ d.view_count }} camera angles
{% endif %} diff --git a/src/central/gui/templates/_event_summaries/itd_511.html b/src/central/gui/templates/_event_summaries/itd_511.html new file mode 100644 index 0000000..f9c53f8 --- /dev/null +++ b/src/central/gui/templates/_event_summaries/itd_511.html @@ -0,0 +1,9 @@ +{# ITD 511 one-line subject. event_type_short ∈ {work_zone, closure, incident, + special_event, advisory}. Fields from payload->data->data. Drops + "Unknown"/"None" direction tokens per the v0.9.1 wzdx lesson. #} +{%- set d = (event.data.get('data') or {}).get('data') or {} -%} +{%- set labels = {'work_zone': 'Road work', 'closure': 'Closure', 'incident': 'Incident', 'special_event': 'Special event', 'advisory': 'Advisory'} -%} +{%- set lead = labels.get(d.get('event_type_short'), 'Traffic event') -%} +{%- set dir = d.get('direction') -%} +{%- set show_dir = dir and dir not in ['Unknown', 'None'] -%} +{{ lead }}{% if d.get('roadway_name') %} on {{ d.roadway_name }}{% if show_dir %} {{ dir }}{% endif %}{% endif %}{% if d.get('is_full_closure') %} (full closure){% endif %} diff --git a/src/central/gui/templates/_event_summaries/itd_511_cameras.html b/src/central/gui/templates/_event_summaries/itd_511_cameras.html new file mode 100644 index 0000000..9f74b59 --- /dev/null +++ b/src/central/gui/templates/_event_summaries/itd_511_cameras.html @@ -0,0 +1,5 @@ +{# ITD 511 camera one-line subject. Location field is already humanized (e.g. + "I-15 UT/ID State Line UT"), so it stands alone. Falls back to roadway then + camera_id. Fields from payload->data->data. #} +{%- set d = (event.data.get('data') or {}).get('data') or {} -%} +Camera: {{ d.get('location') or d.get('roadway') or ('#' ~ d.get('camera_id')) }} diff --git a/tests/fixtures/itd_511_alerts_sample.json b/tests/fixtures/itd_511_alerts_sample.json new file mode 100644 index 0000000..fe51488 --- /dev/null +++ b/tests/fixtures/itd_511_alerts_sample.json @@ -0,0 +1 @@ +[] diff --git a/tests/fixtures/itd_511_cameras_sample.json b/tests/fixtures/itd_511_cameras_sample.json new file mode 100644 index 0000000..9477443 --- /dev/null +++ b/tests/fixtures/itd_511_cameras_sample.json @@ -0,0 +1,90 @@ +[ + { + "Id": 3, + "Source": "ITDNET", + "SourceId": "1000", + "Roadway": "SH-55 Eagle", + "Direction": "Southbound", + "Latitude": 43.619167, + "Longitude": -116.35478, + "Location": "SH-55 Eagle Fairview", + "SortOrder": 0, + "Views": [ + { + "Id": 1039, + "Url": "https://511.idaho.gov/map/Cctv/1039", + "Status": "Disabled", + "Description": "D3 SH-55 37.9 Eagle Fairview 526" + } + ] + }, + { + "Id": 436, + "Source": "ACHD", + "SourceId": "4001", + "Roadway": "Local Boise", + "Direction": "Unknown", + "Latitude": 43.60304, + "Longitude": -116.18841, + "Location": "Park Parkcenter Front Clearwater", + "SortOrder": 0, + "Views": [ + { + "Id": 631, + "Url": "https://511.idaho.gov/map/Cctv/631", + "Status": "Enabled", + "Description": "" + } + ] + }, + { + "Id": 1, + "Source": "UDOT", + "SourceId": "10.C1", + "Roadway": "I-15", + "Direction": "Unknown", + "Latitude": 42.0011, + "Longitude": -112.198, + "Location": "I-15 UT/ID State Line UT", + "SortOrder": 1, + "Views": [ + { + "Id": 1, + "Url": "https://511.idaho.gov/map/Cctv/1", + "Status": "Enabled", + "Description": "N/A" + } + ] + }, + { + "Id": 2, + "Source": "RWIS", + "SourceId": "100.C1", + "Roadway": "SH-75", + "Direction": "Unknown", + "Latitude": 43.5946, + "Longitude": -114.345, + "Location": "SH-75 Wood River", + "SortOrder": 325, + "Views": [ + { + "Id": 2, + "Url": "https://511.idaho.gov/map/Cctv/2", + "Status": "Enabled", + "Description": "" + }, + { + "Id": 3, + "Url": "https://511.idaho.gov/map/Cctv/3", + "Status": "Enabled", + "Description": "" + }, + { + "Id": 4, + "Url": "https://511.idaho.gov/map/Cctv/4", + "Status": "Enabled", + "Description": "" + } + ] + } +] diff --git a/tests/fixtures/itd_511_event_sample.json b/tests/fixtures/itd_511_event_sample.json new file mode 100644 index 0000000..1e9f689 --- /dev/null +++ b/tests/fixtures/itd_511_event_sample.json @@ -0,0 +1,214 @@ +[ + { + "ID": 23, + "SourceId": "4277", + "Organization": "ERS", + "RoadwayName": "SH-81", + "DirectionOfTravel": "Unknown", + "Description": " Work on the shoulder on SH-81 near Poverty Gulch. 7/8/2024 9:29 PM Mon: 12:00 PM - 5:00 PM, Tue, Wed, Thu, Fri, Sat, Sun: Active all day Activities: use caution, warning.", + "Reported": 1720495740, + "LastUpdated": 1749675731, + "StartDate": 1720495740, + "PlannedEndDate": null, + "LanesAffected": "No Data", + "Latitude": 42.5168038430856, + "Longitude": -113.711287649613, + "LatitudeSecondary": null, + "LongitudeSecondary": null, + "EventType": "roadwork", + "EventSubType": "workOnTheShoulder", + "IsFullClosure": false, + "Severity": "None", + "Comment": null, + "EncodedPolyline": null, + "Restrictions": { + "Width": null, + "Height": null, + "Length": null, + "Weight": null, + "Speed": null + }, + "DetourPolyline": "", + "DetourInstructions": "", + "Recurrence": "Mon:
12:00 PM - 5:00 PM

Tue, Wed, Thu, Fri, Sat, Sun:
Active all day

", + "RecurrenceSchedules": [ + { + "StartDate": "7/8/2024 9:29:00 PM-06:00:00", + "EndDate": null, + "Times": [ + { + "StartTime": "12:00:00-06:00:00", + "EndTime": "16:59:59-06:00:00" + } + ], + "DaysOfWeek": [ + "Monday" + ] + }, + { + "StartDate": "7/8/2024 9:29:00 PM-06:00:00", + "EndDate": null, + "Times": [ + { + "StartTime": "00:00:00-06:00:00", + "EndTime": "23:59:59-06:00:00" + } + ], + "DaysOfWeek": [ + "Tuesday", + "Wednesday", + "Thursday", + "Friday", + "Saturday", + "Sunday" + ] + } + ], + "Cause": "roadwork" + }, + { + "ID": 17, + "SourceId": "469", + "Organization": "ERS", + "RoadwayName": "N McDermott Rd", + "DirectionOfTravel": "Both", + "Description": " Long term road construction on N McDermott Rd Both Directions from Five Mile Creek to US-20. All lanes closed. 1/30/2023 2:24 PM Mon, Tue, Wed, Thu, Fri, Sat, Sun: Active all day", + "Reported": 1675113840, + "LastUpdated": 1718758080, + "StartDate": 1675113840, + "PlannedEndDate": null, + "LanesAffected": "All lanes closed", + "Latitude": 43.6485700000001, + "Longitude": -116.47349, + "LatitudeSecondary": 43.6630000000001, + "LongitudeSecondary": -116.47421, + "EventType": "closures", + "EventSubType": "longTermRoadConstruction", + "IsFullClosure": true, + "Severity": "None", + "Comment": "Open to local traffic only.", + "EncodedPolyline": "qbliGhv{eUsk@MeTEsBAsAGiAQw@Qi@MoAYmA]aA]}@a@[?[JSNKNENCTEzCGnAIX", + "Restrictions": { + "Width": null, + "Height": null, + "Length": null, + "Weight": null, + "Speed": null + }, + "DetourPolyline": "", + "DetourInstructions": "", + "Recurrence": "Mon, Tue, Wed, Thu, Fri, Sat, Sun:
Active all day

", + "RecurrenceSchedules": [ + { + "StartDate": "1/30/2023 2:24:00 PM-06:00:00", + "EndDate": null, + "Times": [ + { + "StartTime": "00:00:00-06:00:00", + "EndTime": "23:59:59-06:00:00" + } + ], + "DaysOfWeek": [ + "Monday", + "Tuesday", + "Wednesday", + "Thursday", + "Friday", + "Saturday", + "Sunday" + ] + } + ], + "Cause": "roadwork" + }, + { + "ID": 34585, + "SourceId": "11146", + "Organization": "ERS", + "RoadwayName": "I-84", + "DirectionOfTravel": "East", + "Description": " Left Lane Blocked on I-84 Eastbound near N Franklin Blvd. 2 Left Lanes Blocked. Activities: Expect Delays, Look Out for Flagger, Use Caution.", + "Reported": 1780543740, + "LastUpdated": 1780543820, + "StartDate": 1780543740, + "PlannedEndDate": null, + "LanesAffected": "2 Left Lanes Blocked", + "Latitude": 43.5980897425697, + "Longitude": -116.542818286917, + "LatitudeSecondary": null, + "LongitudeSecondary": null, + "EventType": "accidentsAndIncidents", + "EventSubType": "leftLaneBlocked", + "IsFullClosure": false, + "Severity": "None", + "Comment": "The two left lanes of eastbound I-84 will be blocked near milepost 36.5 tonight for road maintenance. Keep Right.", + "EncodedPolyline": null, + "Restrictions": { + "Width": null, + "Height": null, + "Length": null, + "Weight": null, + "Speed": null + }, + "DetourPolyline": "", + "DetourInstructions": "", + "Recurrence": "", + "RecurrenceSchedules": "", + "Cause": "Incident" + }, + { + "ID": 33663, + "SourceId": "11000", + "Organization": "ERS", + "RoadwayName": "SH-16", + "DirectionOfTravel": "Both", + "Description": " Special event on SH-16 Both Directions at W Chaparral Rd. 6/8/2026 8:00 AM to 6/15/2026 5:00 PM Mon, Tue, Wed, Thu, Fri, Sat, Sun: Active all day Activities: Use Caution.", + "Reported": 1780927200, + "LastUpdated": 1779805341, + "StartDate": 1780927200, + "PlannedEndDate": 1781564400, + "LanesAffected": "No Data", + "Latitude": 43.780078841046, + "Longitude": -116.473209123902, + "LatitudeSecondary": null, + "LongitudeSecondary": null, + "EventType": "specialEvents", + "EventSubType": "specialEvent", + "IsFullClosure": false, + "Severity": "None", + "Comment": "Rodeo Traffic entering/leaving roadway ", + "EncodedPolyline": null, + "Restrictions": { + "Width": null, + "Height": null, + "Length": null, + "Weight": null, + "Speed": null + }, + "DetourPolyline": "", + "DetourInstructions": "", + "Recurrence": "Mon, Tue, Wed, Thu, Fri, Sat, Sun:
Active all day

", + "RecurrenceSchedules": [ + { + "StartDate": "6/8/2026 8:00:00 AM-06:00:00", + "EndDate": "6/15/2026 5:00:00 PM-06:00:00", + "Times": [ + { + "StartTime": "00:00:00-06:00:00", + "EndTime": "23:59:59-06:00:00" + } + ], + "DaysOfWeek": [ + "Monday", + "Tuesday", + "Wednesday", + "Thursday", + "Friday", + "Saturday", + "Sunday" + ] + } + ], + "Cause": "specialEvents" + } +] diff --git a/tests/test_events_feed_frontend.py b/tests/test_events_feed_frontend.py index ffdfcb2..6639287 100644 --- a/tests/test_events_feed_frontend.py +++ b/tests/test_events_feed_frontend.py @@ -1147,6 +1147,8 @@ _SAMPLE_INNER = { "tomtom_flow": {"road_category": "primary", "relative_speed": 0.11}, "tomtom_incidents": {"description": "Roadworks", "from": "Early Road", "to": "Slade Road"}, "state_511_atis_cameras": {"location_description": "I-84 Mountain Home", "camera_id": 42}, + "itd_511": {"event_type_short": "work_zone", "roadway_name": "I-84"}, + "itd_511_cameras": {"location": "I-84 Mountain Home", "camera_id": 42}, } # Exact expected subjects for the deterministic adapters. swpc_alerts is omitted @@ -1169,6 +1171,8 @@ _EXPECTED_SUBJECT = { "tomtom_flow": "Traffic flow (primary) — 11% of free-flow", "tomtom_incidents": "Roadworks on Early Road → Slade Road", "state_511_atis_cameras": "Camera: I-84 Mountain Home", + "itd_511": "Road work on I-84", + "itd_511_cameras": "Camera: I-84 Mountain Home", } diff --git a/tests/test_itd_511.py b/tests/test_itd_511.py new file mode 100644 index 0000000..50f8fc3 --- /dev/null +++ b/tests/test_itd_511.py @@ -0,0 +1,398 @@ +"""Tests for the itd_511 adapter (v0.10.0). + +Fixtures are real captures from https://511.idaho.gov/api/v2/get/event,alerts +trimmed to one record per EventType plus an empty advisories baseline: + tests/fixtures/itd_511_event_sample.json + tests/fixtures/itd_511_alerts_sample.json + +No conftest entry: dedup uses the supervisor-injected cursors.db (inherited +mixin); polling is stateless. +""" + +import json +import logging +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import aiohttp +import pytest + +from central.adapter import SourceAdapter +from central.adapters.itd_511 import ( + EVENT_TYPE_MAP, + Itd511Adapter, + _build_geometry, + _decode_polyline, + _itd_severity, + _parse_epoch, + _strip_or_none, + _Transient, + _wait_strategy, +) +from central.config_models import AdapterConfig +from central.models import Event, Geo + +FIX = Path(__file__).parent / "fixtures" +EVENT = json.loads((FIX / "itd_511_event_sample.json").read_text()) +ALERTS = json.loads((FIX / "itd_511_alerts_sample.json").read_text()) +BY_TYPE = {r["EventType"]: r for r in EVENT} + + +def _cfg(): + return AdapterConfig( + name="itd_511", enabled=True, cadence_s=60, + settings={"api_key_alias": "idaho_511"}, + updated_at=datetime.now(timezone.utc), + ) + + +@pytest.fixture +def adapter(tmp_path): + cs = MagicMock() + cs.get_api_key = AsyncMock(return_value="testkey-32chars-deadbeefdeadbeef") + return Itd511Adapter(_cfg(), cs, tmp_path / "cursors.db") + + +def test_event_type_map_is_complete(): + assert EVENT_TYPE_MAP == { + "roadwork": "work_zone", "closures": "closure", + "accidentsAndIncidents": "incident", "specialEvents": "special_event", + } + + +def test_parse_epoch(): + assert _parse_epoch(1675113840) == datetime(2023, 1, 30, 21, 24, tzinfo=timezone.utc) + assert _parse_epoch(None) is None + assert _parse_epoch("not-an-int") is None + assert _parse_epoch("1675113840") == datetime(2023, 1, 30, 21, 24, tzinfo=timezone.utc) + + +@pytest.mark.parametrize("sev,fc,expected", [ + ("None", False, 1), ("Minor", False, 2), ("Major", False, 3), + ("None", True, 3), ("Minor", True, 3), ("Major", True, 3), # full-closure forces 3 + (None, False, 1), ("Bogus", False, 1), +]) +def test_severity_mapping(sev, fc, expected): + assert _itd_severity(sev, fc) == expected + + +def test_strip_or_none_handles_eventsubtype_trailing_space(): + assert _strip_or_none("pavementMarkingOperations ") == "pavementMarkingOperations" + assert _strip_or_none("") is None + assert _strip_or_none(" ") is None + assert _strip_or_none(None) is None + assert _strip_or_none(42) == 42 # non-string passthrough + + +def test_decode_polyline_roundtrip(): + import polyline as polyline_lib + enc = polyline_lib.encode([(43.6, -116.5), (43.7, -116.4)]) + assert _decode_polyline(enc) == [(43.6, -116.5), (43.7, -116.4)] + assert _decode_polyline(None) == [] + assert _decode_polyline("") == [] + # malformed string => library raises => caught => [] + assert _decode_polyline("\x00\x00\x00not-a-polyline") == [] + + +def test_build_geometry_polyline_wins(): + import polyline as polyline_lib + enc = polyline_lib.encode([(43.6, -116.5), (43.7, -116.4)]) + geom, centroid = _build_geometry(40.0, -100.0, None, None, enc) + assert geom["type"] == "LineString" + assert len(geom["coordinates"]) == 2 + assert centroid == geom["coordinates"][0] # first vertex (lon, lat) order + + +def test_build_geometry_bookend_linestring(): + geom, centroid = _build_geometry(43.6, -116.5, 43.7, -116.4, None) + assert geom == {"type": "LineString", + "coordinates": [(-116.5, 43.6), (-116.4, 43.7)]} + assert centroid == (-116.5, 43.6) + + +def test_build_geometry_point_only(): + geom, centroid = _build_geometry(43.6, -116.5, None, None, None) + assert geom == {"type": "Point", "coordinates": (-116.5, 43.6)} + assert centroid == (-116.5, 43.6) + + +def test_build_geometry_missing_all(): + assert _build_geometry(None, None, None, None, None) == (None, None) + + +@pytest.mark.parametrize("etype,short", [ + ("roadwork", "work_zone"), ("closures", "closure"), + ("accidentsAndIncidents", "incident"), ("specialEvents", "special_event"), +]) +def test_build_event_category_and_dedup_id(adapter, etype, short): + rec = BY_TYPE[etype] + e = adapter._build_event_record(rec) + assert e.category == f"{short}.itd_511" + assert e.id == f"idaho_511:event:{rec['SourceId']}" + assert e.adapter == "itd_511" + assert e.geo.primary_region == "US-ID" + assert e.geo.regions == ["US-ID"] + + +def test_build_event_closure_has_linestring_geometry(adapter): + e = adapter._build_event_record(BY_TYPE["closures"]) + assert e.geo.geometry is not None + assert e.geo.geometry["type"] == "LineString" + assert len(e.geo.geometry["coordinates"]) >= 2 + + +def test_build_event_full_closure_forces_severity_3(adapter): + e = adapter._build_event_record(BY_TYPE["closures"]) + assert e.data["is_full_closure"] is True + assert e.severity == 3 + + +def test_build_event_unknown_event_type_falls_back_to_incident(adapter): + rec = {**BY_TYPE["roadwork"], "EventType": "WhoKnows", "SourceId": "X1"} + e = adapter._build_event_record(rec) + assert e.category == "incident.itd_511" + + +def test_build_event_dedup_id_falls_back_to_id_when_sourceid_missing(adapter): + rec = {**BY_TYPE["roadwork"], "SourceId": None, "ID": 99999} + e = adapter._build_event_record(rec) + assert e.id == "idaho_511:event:99999" + + +def test_build_event_returns_none_without_any_id(adapter): + rec = {**BY_TYPE["roadwork"], "SourceId": None, "ID": None} + assert adapter._build_event_record(rec) is None + + +def test_build_event_strips_trailing_space_on_event_sub_type(adapter): + rec = {**BY_TYPE["roadwork"], "EventSubType": "pavementMarkingOperations "} + e = adapter._build_event_record(rec) + assert e.data["event_sub_type"] == "pavementMarkingOperations" + + +def test_build_event_captures_cause_and_organization(adapter): + e = adapter._build_event_record(BY_TYPE["roadwork"]) + assert e.data["cause"] == BY_TYPE["roadwork"]["Cause"] + assert e.data["organization"] == BY_TYPE["roadwork"]["Organization"] + + +def test_build_event_passes_through_recurrence_and_restrictions(adapter): + e = adapter._build_event_record(BY_TYPE["closures"]) + assert e.data["recurrence_schedules"] == BY_TYPE["closures"]["RecurrenceSchedules"] + assert e.data["restrictions"] == BY_TYPE["closures"]["Restrictions"] + + +@pytest.mark.parametrize("short,expected_subject", [ + ("work_zone", "central.traffic.work_zone.us.id"), + ("closure", "central.traffic.closure.us.id"), + ("incident", "central.traffic.incident.us.id"), + ("special_event", "central.traffic.special_event.us.id"), + ("advisory", "central.traffic.advisory.us.id"), +]) +def test_subject_for(adapter, short, expected_subject): + e = Event(id="x", adapter="itd_511", category=f"{short}.itd_511", + time=datetime.now(timezone.utc), severity=1, geo=Geo(), data={}) + assert adapter.subject_for(e) == expected_subject + + +def test_advisory_structural_passthrough(adapter): + # Synthesize an advisory (ITD currently returns []); per-record try/except + # in poll() means downstream surprises won't sink the cycle. + rec = {"SourceId": "ADV-1", "Description": "Snow event in central Idaho", + "Latitude": 44.0, "Longitude": -114.5, "Reported": 1780500000} + e = adapter._build_advisory_record(rec) + assert e is not None + assert e.category == "advisory.itd_511" + assert e.id == "idaho_511:advisory:ADV-1" + assert e.data["advisory"] == rec # full pass-through, schema-free + assert e.data["latitude"] == 44.0 + assert e.data["event_type_short"] == "advisory" + + +def test_advisory_returns_none_without_any_id(adapter): + assert adapter._build_advisory_record({"Description": "no id"}) is None + + +@pytest.mark.asyncio +async def test_poll_yields_events_from_both_endpoints(adapter): + await adapter.startup() + adapter._fetch = AsyncMock(side_effect=lambda ep: {"event": EVENT, "alerts": ALERTS}[ep]) + events = [e async for e in adapter.poll()] + await adapter.shutdown() + # alerts fixture is [] so events == EVENT count + assert len(events) == len(EVENT) + assert all(e.adapter == "itd_511" for e in events) + assert {e.category for e in events} == { + "work_zone.itd_511", "closure.itd_511", + "incident.itd_511", "special_event.itd_511", + } + + +@pytest.mark.asyncio +async def test_poll_advisory_cadence_throttles_alerts_endpoint(adapter): + """Advisories poll on the 0th, 5th, 10th... event-poll (5x throttle).""" + await adapter.startup() + calls: list[str] = [] + + async def fake_fetch(ep): + calls.append(ep) + return EVENT if ep == "event" else ALERTS + + adapter._fetch = fake_fetch + for _ in range(6): + [_ async for _ in adapter.poll()] + await adapter.shutdown() + # 6 polls: events every time, alerts on poll 0 and poll 5 + assert calls.count("event") == 6 + assert calls.count("alerts") == 2 + + +@pytest.mark.asyncio +async def test_poll_skips_cleanly_without_api_key(tmp_path): + cs = MagicMock() + cs.get_api_key = AsyncMock(return_value=None) + adapter = Itd511Adapter(_cfg(), cs, tmp_path / "cursors.db") + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + assert events == [] # no fetch, clean skip per tomtom_flow precedent + + +@pytest.mark.asyncio +async def test_key_never_leaks_in_error_path(adapter, caplog): + """The key travels in ?key=, so aiohttp's default error messages include + the full URL; every error-log path must run through self._redact(). + Regression guard via caplog inspection. NOTE: ``caplog.text`` only contains + the message field — structured ``extra={}`` kwargs land as attributes on + the LogRecord, so we inspect both surfaces.""" + await adapter.startup() + key_value = adapter._api_key # the testkey set up in the adapter fixture + assert key_value and len(key_value) > 16 + + async def boom(endpoint): + raise aiohttp.ClientConnectionError( + f"Cannot connect to host 511.idaho.gov ssl:default [key={key_value}]" + ) + + adapter._fetch = boom + with caplog.at_level(logging.WARNING, logger="central.adapters.itd_511"): + events = [e async for e in adapter.poll()] + await adapter.shutdown() + assert events == [] + surfaces = [r.getMessage() for r in caplog.records] + surfaces.extend(str(getattr(r, "error", "")) for r in caplog.records) + joined = " ".join(surfaces) + assert key_value not in joined, f"key leaked to log: {joined!r}" + assert "" in joined # redaction marker proves _redact() actually fired + + +def test_inherits_dedup_mixin_from_source_adapter(): + for m in ("is_published", "mark_published", "sweep_old_ids"): + assert m not in Itd511Adapter.__dict__, f"redefines {m}" + assert getattr(Itd511Adapter, m) is getattr(SourceAdapter, m) + + +def test_summary_partial_renders_per_event_type(): + from central.gui.routes import _derive_subject + cases = [ + ({"event_type_short": "work_zone", "roadway_name": "I-84"}, + "Road work on I-84"), + ({"event_type_short": "closure", "roadway_name": "N McDermott Rd", + "direction": "Both", "is_full_closure": True}, + "Closure on N McDermott Rd Both (full closure)"), + ({"event_type_short": "incident", "roadway_name": "I-84", + "direction": "East"}, + "Incident on I-84 East"), + ({"event_type_short": "advisory"}, "Advisory"), + # Drops "Unknown" direction per wzdx lesson + ({"event_type_short": "work_zone", "roadway_name": "I-84", + "direction": "Unknown"}, + "Road work on I-84"), + ] + for inner, expected in cases: + row = {"adapter": "itd_511", "data": {"data": {"data": inner}}} + assert _derive_subject(row) == expected, f"mismatch for {inner!r}" + + +def test_class_attributes_match_spec(): + assert Itd511Adapter.name == "itd_511" + assert Itd511Adapter.data_class == "event" + assert Itd511Adapter.requires_api_key == "idaho_511" + assert Itd511Adapter.api_key_field == "api_key_alias" + assert Itd511Adapter.default_cadence_s == 60 + assert Itd511Adapter.wizard_order is None + assert Itd511Adapter.enrichment_locations == [("latitude", "longitude")] + + +# --- BUG C: 429 Retry-After must drive the wait directly; no double-sleep -- + +def test_transient_carries_wait_s(): + t = _Transient("429 retry-after=42", wait_s=42) + assert t.wait_s == 42 + assert str(t) == "429 retry-after=42" + assert _Transient("5xx").wait_s is None # default omits + + +def test_wait_strategy_honors_transient_wait_s(): + """BUG C regression: a 429 Retry-After must drive the wait directly via + _Transient.wait_s; tenacity must NOT also wait its exponential jitter on + top (the previous shape did both, blocking ~120s+ per cycle).""" + retry_state = MagicMock() + outcome = MagicMock() + outcome.exception.return_value = _Transient("429", wait_s=42) + retry_state.outcome = outcome + assert _wait_strategy(retry_state) == 42.0 + outcome.exception.return_value = _Transient("429", wait_s=60) + assert _wait_strategy(retry_state) == 60.0 + + +def test_wait_strategy_falls_back_for_transient_without_wait_s(): + """5xx _Transient (no Retry-After) falls through to exponential jitter.""" + retry_state = MagicMock() + outcome = MagicMock() + outcome.exception.return_value = _Transient("503 server error") # wait_s None + retry_state.outcome = outcome + retry_state.attempt_number = 1 + retry_state.idle_for = 0 + wait = _wait_strategy(retry_state) + assert isinstance(wait, float) and wait >= 0 + + +def test_wait_strategy_falls_back_for_non_transient(): + """Network errors (no wait_s) get exponential jitter.""" + retry_state = MagicMock() + outcome = MagicMock() + outcome.exception.return_value = aiohttp.ClientConnectionError("net error") + retry_state.outcome = outcome + retry_state.attempt_number = 1 + retry_state.idle_for = 0 + wait = _wait_strategy(retry_state) + assert isinstance(wait, float) and wait >= 0 + + +# --- BUG D3: assert→if-raise (asserts strip under python -O) ----------------- + +@pytest.mark.asyncio +async def test_fetch_session_unset_raises_runtime_not_assert(adapter): + """D3 regression: asserts strip under ``python -O``, so the session-not- + started precondition must be enforced with an explicit if-raise.""" + assert adapter._session is None # precondition: not yet started + with pytest.raises(RuntimeError, match="session not started"): + await adapter._fetch("event") + + +# --- BUG D5: tenacity has no default logging hooks (audit guard) ------------- + +def test_tenacity_decorator_has_explicit_no_log_hooks(): + """D5 audit: tenacity's defaults (before_sleep=None, after=after_nothing) + have no logging — so the URL-with-key can't leak via the retry path. We + pin them explicitly on @retry; if a future tenacity upgrade changes the + defaults, this test fails loudly. Also confirms reraise=True so we get + _Transient/ClientError verbatim instead of RetryError.""" + from tenacity import after_nothing, before_nothing + retrying = Itd511Adapter._fetch.retry + assert retrying.before_sleep is None + assert retrying.after is after_nothing + assert retrying.before is before_nothing + assert retrying.reraise is True diff --git a/tests/test_itd_511_cameras.py b/tests/test_itd_511_cameras.py new file mode 100644 index 0000000..9f4e8b1 --- /dev/null +++ b/tests/test_itd_511_cameras.py @@ -0,0 +1,206 @@ +"""Tests for the itd_511_cameras adapter (v0.10.0). + +Fixture covers 4 cameras: ITDNET, ACHD, UDOT (cross-border per v0.10.0 +finding 4), and an RWIS multi-view (to exercise the additional_views capture): + tests/fixtures/itd_511_cameras_sample.json +""" + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central.adapter import SourceAdapter +from central.adapters.itd_511 import _Transient +from central.adapters.itd_511_cameras import NATIVE_SOURCES, Itd511CamerasAdapter +from central.config_models import AdapterConfig + +FIX = Path(__file__).parent / "fixtures" +CAMS = json.loads((FIX / "itd_511_cameras_sample.json").read_text()) + + +def _cfg(): + return AdapterConfig( + name="itd_511_cameras", enabled=True, cadence_s=600, + settings={"api_key_alias": "idaho_511"}, + updated_at=datetime.now(timezone.utc), + ) + + +@pytest.fixture +def adapter(tmp_path): + cs = MagicMock() + cs.get_api_key = AsyncMock(return_value="testkey-32chars-deadbeefdeadbeef") + return Itd511CamerasAdapter(_cfg(), cs, tmp_path / "cursors.db") + + +def test_class_attributes_match_spec(): + assert Itd511CamerasAdapter.name == "itd_511_cameras" + assert Itd511CamerasAdapter.data_class == "telemetry" + assert Itd511CamerasAdapter.requires_api_key == "idaho_511" + assert Itd511CamerasAdapter.default_cadence_s == 600 + + +def test_build_event_category_and_subject(adapter): + e = adapter._build_event(CAMS[0]) + assert e.category == "camera.itd_511_cameras" + assert e.adapter == "itd_511_cameras" + assert adapter.subject_for(e) == f"central.traffic_cameras.us.id.{CAMS[0]['Id']}" + + +def test_dedup_id_per_utc_day(adapter): + e = adapter._build_event(CAMS[0]) + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + assert e.id == f"idaho_511:cam:{CAMS[0]['Id']}:{today}" + + +def test_image_url_passthrough(adapter): + e = adapter._build_event(CAMS[0]) + assert e.data["image_url"] == CAMS[0]["Views"][0]["Url"] + assert e.data["image_url"].startswith("https://511.idaho.gov/map/Cctv/") + + +def test_source_jurisdiction_preserves_border_cameras(adapter): + """Per v0.10.0 finding 4: ITD aggregates ~1.2% cross-DOT mirrors (UDOT, + ODOT, WYDOT, ...). Region stays US-ID; source_jurisdiction preserves the + raw upstream Source value for downstream re-bucketing.""" + udot = next((c for c in CAMS if c["Source"] == "UDOT"), None) + assert udot is not None, "fixture must include a UDOT cross-border camera" + e = adapter._build_event(udot) + assert e.data["source_jurisdiction"] == "UDOT" + assert e.data["source"] == "UDOT" + assert e.geo.primary_region == "US-ID" # uniform Idaho tagging per locked decision + assert e.geo.regions == ["US-ID"] + + +def test_multiple_views_captured(adapter): + multi = next((c for c in CAMS if len(c.get("Views") or []) > 1), None) + assert multi is not None, "fixture must include a multi-view camera" + e = adapter._build_event(multi) + assert e.data["view_count"] == len(multi["Views"]) + assert e.data["additional_views"] == [v["Url"] for v in multi["Views"][1:]] + + +def test_single_view_has_empty_additional_views(adapter): + single = next((c for c in CAMS if len(c.get("Views") or []) == 1), None) + assert single is not None, "fixture must include a single-view camera" + e = adapter._build_event(single) + assert e.data["additional_views"] == [] + assert e.data["view_count"] == 1 + + +def test_build_event_returns_none_without_id(adapter): + assert adapter._build_event({"Source": "ITDNET"}) is None + + +def test_severity_is_1_for_telemetry(adapter): + e = adapter._build_event(CAMS[0]) + assert e.severity == 1 + + +@pytest.mark.asyncio +async def test_poll_yields_one_event_per_camera(adapter): + await adapter.startup() + adapter._fetch_cameras = AsyncMock(return_value=CAMS) + events = [e async for e in adapter.poll()] + await adapter.shutdown() + assert len(events) == len(CAMS) + assert all(e.adapter == "itd_511_cameras" for e in events) + assert all(e.category == "camera.itd_511_cameras" for e in events) + + +@pytest.mark.asyncio +async def test_poll_skips_cleanly_without_api_key(tmp_path): + cs = MagicMock() + cs.get_api_key = AsyncMock(return_value=None) + adapter = Itd511CamerasAdapter(_cfg(), cs, tmp_path / "cursors.db") + await adapter.startup() + events = [e async for e in adapter.poll()] + await adapter.shutdown() + assert events == [] + + +def test_summary_partial_renders(): + from central.gui.routes import _derive_subject + inner = {"camera_id": 1, "location": "I-15 UT/ID State Line UT", "roadway": "I-15"} + row = {"adapter": "itd_511_cameras", "data": {"data": {"data": inner}}} + assert _derive_subject(row) == "Camera: I-15 UT/ID State Line UT" + + +def test_summary_partial_falls_back_to_id_when_location_missing(): + from central.gui.routes import _derive_subject + inner = {"camera_id": 42} + row = {"adapter": "itd_511_cameras", "data": {"data": {"data": inner}}} + assert _derive_subject(row) == "Camera: #42" + + +def test_inherits_dedup_mixin_from_source_adapter(): + for m in ("is_published", "mark_published", "sweep_old_ids"): + assert m not in Itd511CamerasAdapter.__dict__, f"redefines {m}" + assert getattr(Itd511CamerasAdapter, m) is getattr(SourceAdapter, m) + + +# --- BUG E: poll() must catch _Transient (tenacity reraise after retries) --- + +@pytest.mark.asyncio +async def test_poll_catches_transient_after_exhausted_retries(adapter): + """BUG E regression: cameras.poll() except tuple must include _Transient + so tenacity's reraise of a persistent 5xx or 429 (after exhausted + retries) does NOT crash the whole poll cycle.""" + await adapter.startup() + + async def boom_transient(): + raise _Transient("503 persistent") + + adapter._fetch_cameras = boom_transient + events = [e async for e in adapter.poll()] + await adapter.shutdown() + assert events == [] # poll exited cleanly, didn't raise + + +# --- BUG D2: NATIVE_SOURCES allow-list lives at the adapter, not the partial - + +def test_native_sources_module_constant(): + assert NATIVE_SOURCES == frozenset({"ITDNET", "Idaho511", "ACHD", "RWIS"}) + + +def test_is_native_source_flag_set_per_camera(adapter): + """Border-region UDOT camera is non-native; ITDNET is native.""" + udot = next(c for c in CAMS if c["Source"] == "UDOT") + itdnet = next(c for c in CAMS if c["Source"] == "ITDNET") + assert adapter._build_event(udot).data["is_native_source"] is False + assert adapter._build_event(itdnet).data["is_native_source"] is True + + +def test_row_partial_does_not_hardcode_source_list(adapter): + """D2 regression: the cross-DOT-mirror annotation is driven by + data.is_native_source — the partial must NOT carry the source allow-list + itself ([[feedback_no_hardcoding]]).""" + from central.gui.routes import _get_templates + tmpl = _get_templates().env.get_template("_event_rows/itd_511_cameras.html") + udot_evt = adapter._build_event(next(c for c in CAMS if c["Source"] == "UDOT")) + itdnet_evt = adapter._build_event(next(c for c in CAMS if c["Source"] == "ITDNET")) + udot_html = tmpl.render(event={"data": {"data": {"data": udot_evt.data}}}) + itdnet_html = tmpl.render(event={"data": {"data": {"data": itdnet_evt.data}}}) + assert "(cross-DOT mirror)" in udot_html + assert "(cross-DOT mirror)" not in itdnet_html + # Audit: the partial source on disk must not contain the allow-list. + partial = Path(__file__).resolve().parents[1] / ( + "src/central/gui/templates/_event_rows/itd_511_cameras.html" + ) + text = partial.read_text() + for name in NATIVE_SOURCES: + assert name not in text, f"partial hardcodes {name}" + + +# --- BUG D3: assert→if-raise on the cameras sibling -------------------------- + +@pytest.mark.asyncio +async def test_fetch_session_unset_raises_runtime(adapter): + """D3 regression: asserts strip under python -O; the session-not-started + precondition must hold even with optimizations.""" + assert adapter._session is None + with pytest.raises(RuntimeError, match="session not started"): + await adapter._fetch_cameras() diff --git a/tests/test_telemetry_separation.py b/tests/test_telemetry_separation.py index ea03410..ddca810 100644 --- a/tests/test_telemetry_separation.py +++ b/tests/test_telemetry_separation.py @@ -11,7 +11,7 @@ from central.adapter_discovery import discover_adapters from central.gui import routes # Adapters with data_class="telemetry" (the pinned split; grow as telemetry adapters land). -_TELEMETRY = ["nwis", "state_511_atis_cameras", "tomtom_flow"] +_TELEMETRY = ["itd_511_cameras", "nwis", "state_511_atis_cameras", "tomtom_flow"] # --- data_class defaults / registry split ----------------------------------- diff --git a/uv.lock b/uv.lock index 15c7f7f..2a5ae75 100644 --- a/uv.lock +++ b/uv.lock @@ -185,6 +185,7 @@ dependencies = [ { name = "jinja2" }, { name = "mapbox-vector-tile" }, { name = "nats-py" }, + { name = "polyline" }, { name = "pydantic" }, { name = "pydantic-settings" }, { name = "python-multipart" }, @@ -214,6 +215,7 @@ requires-dist = [ { name = "jinja2", specifier = ">=3.1.6" }, { name = "mapbox-vector-tile", specifier = ">=2.0" }, { name = "nats-py", specifier = ">=2.14.0" }, + { name = "polyline", specifier = ">=2.0,<3" }, { name = "pydantic", specifier = ">=2,<3" }, { name = "pydantic-settings", specifier = ">=2.7.0" }, { name = "python-multipart", specifier = ">=0.0.20" }, @@ -648,6 +650,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, ] +[[package]] +name = "polyline" +version = "2.0.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2c/c1/1a6ee4f9f02a55b2a9241bbadd342970160f7e42423307f21ee8f5530d4e/polyline-2.0.4.tar.gz", hash = "sha256:f05ade694522bf1720febebe1672f820f43a13c6a1664751e7769d47e8ca9b1b", size = 8261, upload-time = "2025-12-02T17:55:22.735Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/34/a8/4ebd3cb31d380e018efb1c8bf92664b196a41aba19506015b682af2587b9/polyline-2.0.4-py3-none-any.whl", hash = "sha256:a4e0c15b8ecb32915559f8cf210f1f8c2f5cc53d3cd32c91d7c1668d6e936e10", size = 7167, upload-time = "2025-12-02T17:55:20.323Z" }, +] + [[package]] name = "propcache" version = "0.5.2"