feat(notifications): Phase 2.8 roads511 adapter pipeline integration

Adds Roads511Adapter.to_event(), wiring the state 511 road-events adapter
into the notification EventBus following the Phase 2.7 traffic pattern.

to_event() design:
- Category: fixed "road_closure".
- Severity: passed through unchanged from the adapter's existing
  _parse_event logic (priority on closure, else routine).
- Summary enriched with closure status, roadway, and description.
- group_key: the stored event_id (already the stable "511_{id}" key), so
  re-polls of the same incident coalesce.
- inhibit_keys: a single key equal to group_key. Severity tiering is
  delegated to the pipeline Inhibitor (ranks routine<priority<immediate
  per shared key, suppressing lower-severity re-emissions of the same
  incident within the Inhibitor TTL). No severity encoded into the key.
- Defensive: missing lat/lon or missing event_id returns None; whole body
  is try/except-guarded (returns None on corruption).

Store wiring: no change. EnvironmentalStore._ingest()'s generic "else"
branch already emits any adapter exposing to_event() (live since 2.6.5).

Rule 17: to_event introduces no new tunable. (The state base_url / bbox /
api_key already exist in Roads511Config and env_feeds.yaml; secrets go in
/data/secrets/.env via ${VAR}, never git.)

Tests: tests/test_adapter_roads511.py (14 tests) mirrors
test_adapter_traffic.py -- category, severity pass-through,
group_key/inhibit_keys, field population, defensive cases. Full suite:
161 passed.

live smoke test SKIPPED: Idaho 511 v2 (511.idaho.gov/api/v2) requires an
API key ("Invalid Key" response) and none is available in .ref/credentials
(cannot self-register). Per the standing key-less-adapter policy, the code
+ unit tests are committed and Gate D is skipped; roads511 is left disabled
in prod (enabling it keyless would only emit HTTP 400 errors). The
to_event() path is fully unit-validated and structurally identical to the
live traffic/FIRMS wiring (same EnvironmentalStore->EventBus path); live
validation will run if/when an Idaho 511 key is provided.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-27 21:18:21 +00:00
commit f273a8d5b0
2 changed files with 263 additions and 1 deletions

View file

@ -8,11 +8,13 @@ import json
import logging
import os
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urljoin
from meshai.notifications.events import Event, make_event
if TYPE_CHECKING:
from ..config import Roads511Config
@ -348,6 +350,64 @@ class Roads511Adapter:
logger.debug(f"511 event parse error: {e} - item: {item}")
return None
def to_event(self, evt: dict) -> Optional["Event"]:
"""Translate a stored 511 road event dict into a pipeline Event.
Args:
evt: Internal event dict from get_events()
Returns:
Event instance ready for EventBus emission, or None if the
dict is missing required fields (lat/lon or event_id).
"""
try:
lat = evt.get("lat")
lon = evt.get("lon")
if lat is None or lon is None:
return None # Can't make a useful Event without coords
event_id = evt.get("event_id")
if not event_id:
return None # No stable identity to group/inhibit on
props = evt.get("properties", {}) or {}
severity = evt.get("severity", "routine")
title = evt.get("headline", "") or evt.get("event_type", "") or "Road Event"
# Richer summary: closure status, roadway, description
summary_parts = [title]
if props.get("is_closure"):
summary_parts.append("road closed")
roadway = props.get("roadway")
if roadway and str(roadway) not in title:
summary_parts.append(str(roadway))
desc = evt.get("description")
if desc and desc not in title:
summary_parts.append(desc)
summary = " | ".join(summary_parts)[:300]
# The stored event_id is already the stable "511_{id}" key. Re-polls
# of the same incident coalesce on this group_key; using it as the
# sole inhibit_key lets the pipeline Inhibitor suppress
# lower-severity re-emissions while a higher-severity one is active
# for the same incident (severity tiering delegated to Inhibitor).
return make_event(
source="511",
category="road_closure",
severity=severity,
title=title,
summary=summary,
timestamp=evt.get("fetched_at"),
expires=evt.get("expires"),
lat=lat,
lon=lon,
group_key=event_id,
inhibit_keys=[event_id],
)
except Exception:
logger.exception(f"511 to_event failed for evt: {evt.get('event_id')}")
return None
def get_events(self) -> list:
"""Get current road events."""
return self._events