central/tests/test_telemetry_separation.py
malice 1d5548c24c
v0.10.0: ITD 511 official API adapter (events + advisories + cameras) (#85)
First official-state-DOT-API pattern landing. Two adapters in one PR:

- itd_511 (event-class): polls Events (60s) + Advisories (300s) from
  https://511.idaho.gov/api/v2/get/{event,alerts}. Decodes EncodedPolyline
  to LineString via the polyline lib (bookend LineString or Point fallback);
  ITD Severity string mapped None->1 / Minor->2 / Major->3 with
  IsFullClosure=true forcing 3 regardless; RecurrenceSchedules /
  Restrictions / DetourPolyline pass through unmodified. Advisories ship
  as structural pass-through under data.advisory since the upstream
  /alerts endpoint currently returns []; per-record try/except keeps a
  surprise shape from sinking the cycle when ITD posts its first one.

- itd_511_cameras (telemetry-class): polls Cameras (600s). One event per
  camera per UTC day; image URL passes straight through to <img src>.
  Region uniform US-ID with data.source_jurisdiction preserving the raw
  upstream Source field for the ~1.2% cross-DOT border-region mirrors
  (UDOT / ODOT / WYDOT / WSDOT / NDot / MTD / DriveBC / Lemhi County).

Subject convention (v0.9.20 forward): central.traffic.<event_type>.us.id
and central.traffic_cameras.us.id.<camera_id>. Castle Rock state_511_atis
keeps its bare-state subject; consumers stay on central.traffic.>
wildcards during the A/B comparison window.

Retry predicate tightened from the Castle Rock / TomTom precedent: 5xx +
connection / timeout retry; 4xx other than 429 skip-with-warn (don't
burn quota on permanent errors); 429 honors Retry-After once then
retries. API key (alias 'idaho_511') travels in the ?key= query string,
so every error log path runs through self._redact() to scrub the URL.

Both adapters ship disabled; operator enables via GUI after registering
the API key with 'python -m set_api_key idaho_511'. Reuses existing
CENTRAL_TRAFFIC and CENTRAL_TRAFFIC_CAMERAS streams -- no archive
restart needed.

Scope-cap exception: this PR is ~1.5k lines vs. the standard 500-line
cap, authorized as a one-time exception for the first
official-state-DOT-API pattern landing. Two adapters + their tests +
real-API fixtures naturally exceed the v0.9.x adapter-cap budget.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-03 22:36:26 -06:00

104 lines
4 KiB
Python

"""Tests for v0.7.4 telemetry/event separation: SourceAdapter.data_class,
registry split, class-scoped filter options, and the data_class SQL filter.
Registry-derived (no hardcoded adapter lists beyond the nwis pin). No live DB.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from central.adapter import SourceAdapter
from central.adapter_discovery import discover_adapters
from central.gui import routes
# Adapters with data_class="telemetry" (the pinned split; grow as telemetry adapters land).
_TELEMETRY = ["itd_511_cameras", "nwis", "state_511_atis_cameras", "tomtom_flow"]
# --- data_class defaults / registry split -----------------------------------
def test_base_default_is_event():
assert SourceAdapter.data_class == "event"
def test_registry_split_11_event_1_telemetry():
reg = discover_adapters()
by_class = {}
for name, cls in reg.items():
by_class.setdefault(getattr(cls, "data_class", "event"), []).append(name)
assert sorted(by_class.get("telemetry", [])) == _TELEMETRY
# Everything else is event-class; the split must cover the whole registry.
assert sorted(by_class.get("event", [])) == sorted(n for n in reg if n not in _TELEMETRY)
assert len(by_class.get("event", [])) == len(reg) - len(_TELEMETRY)
def test_class_adapter_names():
assert "nwis" not in routes._class_adapter_names("event")
assert sorted(routes._class_adapter_names("telemetry")) == _TELEMETRY
assert "usgs_quake" in routes._class_adapter_names("event")
# --- class-scoped chip-picker / legend options -------------------------------
def test_event_options_exclude_nwis():
flat, grouped = routes._adapter_filter_options("event")
names = {a["name"] for a in flat}
assert "nwis" not in names
assert len(flat) == len(discover_adapters()) - len(_TELEMETRY)
grouped_values = {opt["value"] for _, items in grouped for opt in items}
assert "nwis" not in grouped_values
def test_telemetry_options_only_nwis():
flat, grouped = routes._adapter_filter_options("telemetry")
assert sorted(a["name"] for a in flat) == _TELEMETRY
grouped_values = [opt["value"] for _, items in grouped for opt in items]
assert sorted(grouped_values) == _TELEMETRY
def test_colors_stable_across_classes():
"""A given adapter keeps the same color on /events and /telemetry (colors
are keyed to the full registry, not the per-tab subset)."""
full, _ = routes._adapter_filter_options()
full_color = {a["name"]: a["color"] for a in full}
ev, _ = routes._adapter_filter_options("event")
for a in ev:
assert a["color"] == full_color[a["name"]]
# --- data_class SQL filter (captured SQL) ------------------------------------
async def _capture(parsed):
captured = {}
async def fake_fetch(query, *args):
captured["query"] = query
captured["params"] = list(args)
return []
conn = MagicMock()
conn.fetch = fake_fetch
pool = MagicMock()
pool.acquire.return_value.__aenter__ = AsyncMock(return_value=conn)
pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
with patch("central.gui.routes.get_pool", return_value=pool):
await routes._fetch_events(parsed)
return captured
@pytest.mark.asyncio
async def test_class_adapters_adds_adapter_any_condition():
parsed, _ = routes._parse_events_params({"time": "all"}, default_offset=0)
parsed["class_adapters"] = routes._class_adapter_names("event")
cap = await _capture(parsed)
assert "adapter = ANY($" in cap["query"]
assert routes._class_adapter_names("event") in cap["params"]
@pytest.mark.asyncio
async def test_no_class_adapters_no_class_condition():
"""events.json path: no class_adapters -> no extra adapter filter (all classes)."""
parsed, _ = routes._parse_events_params({"time": "all"}) # cursor-mode, no class
assert parsed.get("class_adapters") is None
cap = await _capture(parsed)
# The only adapter=ANY would come from a user filter, which we didn't set.
assert "adapter = ANY($" not in cap["query"]