feat(notifications): Phase 2.7 traffic adapter pipeline integration

Adds TomTomTrafficAdapter.to_event(), wiring the traffic adapter into
the notification EventBus following the FIRMS pattern (Phase 2.6).

to_event() design:
- Category: fixed "traffic_congestion" (a road closure raises severity,
  not category).
- Severity: passed through unchanged from the adapter's existing
  _fetch_point logic (priority on closure / heavy congestion, else
  routine). No threshold is re-derived or introduced in to_event.
- Summary enriched with current/free-flow speed, % free flow, closure,
  and confidence.
- Defensive: missing lat/lon or missing corridor identity returns None;
  the whole body is try/except-guarded (returns None on corruption).

Inhibit-key composition:
- A single stable per-corridor key, "traffic_{corridor}" (lowercased,
  spaces->_), is used as BOTH group_key and the sole inhibit_key. This
  matches the adapter's own event_id, so re-polls of a corridor coalesce.
- Severity tiering is delegated to the pipeline Inhibitor, which ranks
  routine<priority<immediate per shared inhibit_key: a higher-severity
  emission for a corridor suppresses lower-severity re-emissions of the
  same corridor within the Inhibitor TTL window. No severity is encoded
  into the key (mirrors FIRMS's spatial-key approach).

Store wiring: no change. EnvironmentalStore._ingest()'s generic "else"
branch already emits any adapter exposing to_event() (live since 2.6.5).

Rule 17: to_event introduces no new tunable. The api_key is injected via
the secrets channel ($TOMTOM_API_KEY in /data/secrets/.env, referenced
as ${TOMTOM_API_KEY} in env_feeds.yaml) -- the GUI-editable reference
stays in config while the secret never enters git. The only other knob
in play is the pipeline-level Inhibitor TTL (1800s, set in
build_pipeline), which is pipeline infrastructure, not traffic-owned;
left out of scope.

Tests: tests/test_adapter_traffic.py (15 tests) mirrors
test_adapter_firms.py -- category, severity pass-through,
group_key/inhibit_keys, field population, defensive cases. Full suite:
147 passed.

Smoke test (prod, Magic Valley corridors I-84 @ Jerome, US-93 Perrine
Bridge, US-30 Twin Falls): clean startup, 6 env adapters loaded, no
traceback. "TomTom traffic updated: 3 corridors" (no auth/DNS error),
then 3 Events emitted to the pipeline bus with traffic_congestion
category -- the full store->bus->pipeline path observed live. Emission
count stable at 3 (one per corridor, is_new-gated).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-27 19:17:27 +00:00
commit d9cc80daf8
2 changed files with 270 additions and 1 deletions

66
meshai/env/traffic.py vendored
View file

@ -4,11 +4,13 @@ import json
import logging
import os
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
from meshai.notifications.events import Event, make_event
if TYPE_CHECKING:
from ..config import TomTomConfig
@ -235,6 +237,68 @@ class TomTomTrafficAdapter:
self._consecutive_errors += 1
return None
def to_event(self, evt: dict) -> Optional["Event"]:
"""Translate a stored traffic event dict into a pipeline Event.
Args:
evt: Internal event dict from get_events()
Returns:
Event instance ready for EventBus emission, or None if the
dict is missing required fields (lat/lon or corridor identity).
"""
try:
lat = evt.get("lat")
lon = evt.get("lon")
if lat is None or lon is None:
return None # Can't make a useful Event without coords
props = evt.get("properties", {}) or {}
corridor = props.get("corridor")
if not corridor:
return None # No stable identity to group/inhibit on
severity = evt.get("severity", "routine")
title = evt.get("headline", "") or f"Traffic: {corridor}"
# Richer summary: speed vs free flow, closure, confidence
summary_parts = [title]
if props.get("roadClosure"):
summary_parts.append("road closed")
if props.get("currentSpeed") is not None and props.get("freeFlowSpeed"):
summary_parts.append(
f"{int(props['currentSpeed'])}/{int(props['freeFlowSpeed'])} mph"
)
if props.get("speedRatio") is not None:
summary_parts.append(f"{int(props['speedRatio'] * 100)}% free flow")
if props.get("confidence") is not None:
summary_parts.append(f"conf {props['confidence']}")
summary = " | ".join(summary_parts)[:300]
# Stable per-corridor key (matches the adapter's own event_id
# derivation). Re-polls of the same corridor coalesce on this
# group_key; using it as the sole inhibit_key lets the pipeline
# Inhibitor suppress lower-severity re-emissions while a
# higher-severity one is active for the same corridor.
corridor_key = f"traffic_{str(corridor).replace(' ', '_').lower()}"
return make_event(
source="traffic",
category="traffic_congestion",
severity=severity,
title=title,
summary=summary,
timestamp=evt.get("fetched_at"),
expires=evt.get("expires"),
lat=lat,
lon=lon,
group_key=corridor_key,
inhibit_keys=[corridor_key],
)
except Exception:
logger.exception(f"Traffic to_event failed for evt: {evt.get('event_id')}")
return None
def get_events(self) -> list:
"""Get current traffic events."""
return self._events