diff --git a/meshai/env/roads511.py b/meshai/env/roads511.py index 95a670f..e9b2c80 100644 --- a/meshai/env/roads511.py +++ b/meshai/env/roads511.py @@ -8,11 +8,13 @@ import json import logging import os import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from urllib.parse import urljoin +from meshai.notifications.events import Event, make_event + if TYPE_CHECKING: from ..config import Roads511Config @@ -348,6 +350,64 @@ class Roads511Adapter: logger.debug(f"511 event parse error: {e} - item: {item}") return None + def to_event(self, evt: dict) -> Optional["Event"]: + """Translate a stored 511 road event dict into a pipeline Event. + + Args: + evt: Internal event dict from get_events() + + Returns: + Event instance ready for EventBus emission, or None if the + dict is missing required fields (lat/lon or event_id). + """ + try: + lat = evt.get("lat") + lon = evt.get("lon") + if lat is None or lon is None: + return None # Can't make a useful Event without coords + + event_id = evt.get("event_id") + if not event_id: + return None # No stable identity to group/inhibit on + + props = evt.get("properties", {}) or {} + severity = evt.get("severity", "routine") + title = evt.get("headline", "") or evt.get("event_type", "") or "Road Event" + + # Richer summary: closure status, roadway, description + summary_parts = [title] + if props.get("is_closure"): + summary_parts.append("road closed") + roadway = props.get("roadway") + if roadway and str(roadway) not in title: + summary_parts.append(str(roadway)) + desc = evt.get("description") + if desc and desc not in title: + summary_parts.append(desc) + summary = " | ".join(summary_parts)[:300] + + # The stored event_id is already the stable "511_{id}" key. Re-polls + # of the same incident coalesce on this group_key; using it as the + # sole inhibit_key lets the pipeline Inhibitor suppress + # lower-severity re-emissions while a higher-severity one is active + # for the same incident (severity tiering delegated to Inhibitor). + return make_event( + source="511", + category="road_closure", + severity=severity, + title=title, + summary=summary, + timestamp=evt.get("fetched_at"), + expires=evt.get("expires"), + lat=lat, + lon=lon, + group_key=event_id, + inhibit_keys=[event_id], + ) + except Exception: + logger.exception(f"511 to_event failed for evt: {evt.get('event_id')}") + return None + def get_events(self) -> list: """Get current road events.""" return self._events diff --git a/tests/test_adapter_roads511.py b/tests/test_adapter_roads511.py new file mode 100644 index 0000000..4322c11 --- /dev/null +++ b/tests/test_adapter_roads511.py @@ -0,0 +1,202 @@ +"""Tests for 511 roads adapter Phase 2.8 — to_event() method.""" + +import time +from unittest.mock import MagicMock + +import pytest + +from meshai.env.roads511 import Roads511Adapter +from meshai.notifications.events import Event + + +# ============================================================ +# FIXTURES +# ============================================================ + +@pytest.fixture +def mock_config(): + """Create a mock Roads511Config with real scalar fields.""" + config = MagicMock() + config.api_key = "" + config.base_url = "https://511.example.gov/api/v2" + config.endpoints = ["/get/event"] + config.bbox = [] + config.tick_seconds = 300 + return config + + +@pytest.fixture +def adapter(mock_config): + """Create a Roads511Adapter with mocked config.""" + return Roads511Adapter(mock_config) + + +def make_511_event( + event_id="511_evt123", + event_type="Closure", + roadway="US-93", + description="Rockslide, road closed both directions", + severity="priority", + lat=42.6, + lon=-114.46, + is_closure=True, + headline=None, +): + """Helper to create a stored 511 event dict (mirrors _parse_event).""" + now = time.time() + if headline is None: + headline = f"{roadway}: {description[:100]}" + return { + "source": "511", + "event_id": event_id, + "event_type": event_type, + "headline": headline, + "description": description, + "severity": severity, + "lat": lat, + "lon": lon, + "expires": now + 21600, + "fetched_at": now, + "properties": { + "roadway": roadway, + "is_closure": is_closure, + "last_updated": "2026-05-27T12:00:00Z", + }, + } + + +# ============================================================ +# CATEGORY TESTS +# ============================================================ + +def test_to_event_category_is_road_closure(adapter): + """511 events map to the road_closure category.""" + event = adapter.to_event(make_511_event()) + assert event is not None + assert event.category == "road_closure" + + +def test_to_event_nonclosure_still_road_closure(adapter): + """A construction event is still road_closure category (severity differs).""" + event = adapter.to_event( + make_511_event(event_type="Construction", severity="routine", is_closure=False) + ) + assert event is not None + assert event.category == "road_closure" + + +# ============================================================ +# SEVERITY PASS-THROUGH TESTS +# ============================================================ + +def test_to_event_severity_passes_through(adapter): + """Severity from the stored event passes through unchanged.""" + for sev in ["routine", "priority", "immediate"]: + event = adapter.to_event(make_511_event(severity=sev)) + assert event is not None + assert event.severity == sev + + +# ============================================================ +# GROUP KEY / INHIBIT KEY TESTS +# ============================================================ + +def test_to_event_group_key_is_stable_event_id(adapter): + """Group key is the stable 511_{event_id} key.""" + event = adapter.to_event(make_511_event(event_id="511_abc")) + assert event is not None + assert event.group_key == "511_abc" + + +def test_to_event_inhibit_keys_match_group_key(adapter): + """The sole inhibit key equals the group key (Inhibitor does severity tiering).""" + event = adapter.to_event(make_511_event()) + assert event is not None + assert event.inhibit_keys == [event.group_key] + + +def test_two_polls_same_incident_share_group_key(adapter): + """Two re-polls of the same incident (any severity) share the group key.""" + e1 = adapter.to_event(make_511_event(event_id="511_x", severity="routine")) + e2 = adapter.to_event(make_511_event(event_id="511_x", severity="priority")) + assert e1 is not None and e2 is not None + assert e1.group_key == e2.group_key + + +def test_distinct_incidents_distinct_group_keys(adapter): + """Distinct incidents get distinct group keys.""" + e1 = adapter.to_event(make_511_event(event_id="511_a")) + e2 = adapter.to_event(make_511_event(event_id="511_b")) + assert e1.group_key != e2.group_key + + +# ============================================================ +# CONTENT / FIELD POPULATION TESTS +# ============================================================ + +def test_to_event_populates_core_fields(adapter): + """Core Event fields are populated from the stored dict.""" + evt = make_511_event(lat=42.61, lon=-114.21) + event = adapter.to_event(evt) + assert event is not None + assert event.source == "511" + assert event.lat == 42.61 + assert event.lon == -114.21 + assert event.expires == evt["expires"] + assert event.timestamp == evt["fetched_at"] + assert event.id # auto-computed + + +def test_to_event_summary_notes_closure(adapter): + """Summary notes a road closure.""" + event = adapter.to_event(make_511_event(is_closure=True)) + assert event is not None + assert "road closed" in event.summary + + +def test_to_event_title_falls_back_when_headline_empty(adapter): + """Empty headline falls back to event_type.""" + event = adapter.to_event(make_511_event(headline="", event_type="Incident")) + assert event is not None + assert event.title == "Incident" + + +# ============================================================ +# DEFENSIVE TESTS +# ============================================================ + +def test_to_event_missing_coords_returns_none(adapter): + """Missing coordinates returns None.""" + evt = make_511_event() + evt["lat"] = None + assert adapter.to_event(evt) is None + + +def test_to_event_missing_event_id_returns_none(adapter): + """Missing event_id returns None (no stable group key).""" + evt = make_511_event() + evt["event_id"] = None + assert adapter.to_event(evt) is None + + +def test_to_event_missing_properties_returns_event(adapter): + """No properties dict still yields an event (props only enrich the summary).""" + evt = { + "source": "511", + "event_id": "511_z", + "event_type": "Closure", + "headline": "US-30 closed", + "severity": "priority", + "lat": 42.6, + "lon": -114.4, + "fetched_at": time.time(), + } + event = adapter.to_event(evt) + assert event is not None + assert event.category == "road_closure" + assert event.group_key == "511_z" + + +def test_to_event_does_not_raise_on_corrupted_dict(adapter): + """Corrupted dict returns None without raising.""" + assert adapter.to_event({"garbage": True}) is None