diff --git a/meshai/env/store.py b/meshai/env/store.py index 6f1543b..ffcdb2a 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -113,6 +113,8 @@ class EnvironmentalStore: """Convert raw event to pipeline Event and emit to bus.""" try: event = adapter.to_event(raw_evt) + if event is None: + return # adapter declined to emit (non-actionable reading) self._event_bus.emit(event) logger.info( "Emitted %s event %s (%s) to pipeline bus", diff --git a/meshai/env/usgs.py b/meshai/env/usgs.py index cb7d7f6..841b648 100644 --- a/meshai/env/usgs.py +++ b/meshai/env/usgs.py @@ -13,6 +13,8 @@ from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from urllib.parse import urlencode +from meshai.notifications.events import Event, make_event + if TYPE_CHECKING: from ..config import USGSConfig @@ -435,6 +437,76 @@ class USGSStreamsAdapter: return changed + def to_event(self, evt: dict) -> Optional["Event"]: + """Translate a stored USGS gauge reading into a pipeline Event. + + Only elevated readings are emitted: the category is chosen from + flood_status, so a routine (below-action-stage) reading -- which has + no flood_status -- is intentionally NOT emitted (returns None). + + Args: + evt: Internal event dict from get_events() + + Returns: + Event instance ready for EventBus emission, or None if the dict + is missing lat/lon or event_id, or the reading is not elevated. + """ + try: + lat = evt.get("lat") + lon = evt.get("lon") + if lat is None or lon is None: + return None # Can't make a useful Event without coords + + event_id = evt.get("event_id") + if not event_id: + return None # No stable identity to group/inhibit on + + props = evt.get("properties", {}) or {} + flood_status = props.get("flood_status") + if not flood_status: + return None # routine reading -- not actionable, do not emit + + # Category from flood_status: an exceeded stage is a flood warning; + # "Action Stage" (approaching) is high water. + if "Flood" in str(flood_status): + category = "stream_flood_warning" + else: # "Action Stage" + category = "stream_high_water" + + severity = evt.get("severity", "routine") + title = evt.get("headline", "") or props.get("site_name") or "Stream Gauge" + + # Summary: reading value/unit and the flood status + summary_parts = [title] + value = props.get("value") + unit = props.get("unit") + if value is not None: + summary_parts.append(f"{value} {unit}".strip()) + summary_parts.append(str(flood_status)) + summary = " | ".join(summary_parts)[:300] + + # event_id is already the stable "{site_id}_{param}" key. Re-polls of + # the same gauge/parameter coalesce on this group_key; using it as the + # sole inhibit_key lets the pipeline Inhibitor suppress lower-severity + # re-emissions while a higher-severity one is active (severity tiering + # delegated to the Inhibitor). + return make_event( + source="usgs", + category=category, + severity=severity, + title=title, + summary=summary, + timestamp=evt.get("fetched_at"), + expires=evt.get("expires"), + lat=lat, + lon=lon, + group_key=event_id, + inhibit_keys=[event_id], + ) + except Exception: + logger.exception(f"USGS to_event failed for evt: {evt.get('event_id')}") + return None + def get_events(self) -> list: """Get current stream gauge events.""" return self._events diff --git a/tests/test_adapter_usgs.py b/tests/test_adapter_usgs.py new file mode 100644 index 0000000..990bc9b --- /dev/null +++ b/tests/test_adapter_usgs.py @@ -0,0 +1,193 @@ +"""Tests for USGS water adapter Phase 2.9 — to_event() method.""" + +import time +from unittest.mock import MagicMock + +import pytest + +from meshai.env.usgs import USGSStreamsAdapter +from meshai.notifications.events import Event + + +# ============================================================ +# FIXTURES +# ============================================================ + +@pytest.fixture +def mock_config(): + """Create a mock USGSConfig with real scalar fields.""" + config = MagicMock() + config.sites = [] + config.tick_seconds = 900 + config.flood_thresholds = {} + return config + + +@pytest.fixture +def adapter(mock_config): + """Create a USGSStreamsAdapter with mocked config.""" + return USGSStreamsAdapter(mock_config) + + +def make_usgs_event( + site_id="13090500", + param_type="height", + site_name="SNAKE RIVER NR TWIN FALLS ID", + severity="priority", + flood_status="Minor Flood", + value=10.8, + unit="ft", + lat=42.6, + lon=-114.47, + headline=None, +): + """Helper to create a stored USGS event dict (mirrors _fetch).""" + now = time.time() + if headline is None: + headline = f"{site_name}: {value} {unit}" + if flood_status: + headline += f" — {flood_status}" + return { + "source": "usgs", + "event_id": f"{site_id}_{param_type}", + "event_type": "Stream Gauge", + "headline": headline, + "severity": severity, + "lat": lat, + "lon": lon, + "expires": now + 1800, + "fetched_at": now, + "properties": { + "site_id": site_id, + "site_name": site_name, + "parameter": "Gage height" if param_type == "height" else "Streamflow", + "value": value, + "unit": unit, + "timestamp": "2026-05-27T12:00:00", + "flood_status": flood_status, + "flood_stages": {"action_stage": 9.0, "flood_stage": 10.5}, + }, + } + + +# ============================================================ +# CATEGORY TESTS +# ============================================================ + +def test_minor_flood_is_flood_warning(adapter): + """Minor/Moderate/Major flood maps to stream_flood_warning.""" + for status in ["Minor Flood", "Moderate Flood", "Major Flood"]: + event = adapter.to_event(make_usgs_event(flood_status=status)) + assert event is not None + assert event.category == "stream_flood_warning" + + +def test_action_stage_is_high_water(adapter): + """Action Stage maps to stream_high_water.""" + event = adapter.to_event(make_usgs_event(flood_status="Action Stage", severity="routine")) + assert event is not None + assert event.category == "stream_high_water" + + +# ============================================================ +# SEVERITY PASS-THROUGH TESTS +# ============================================================ + +def test_severity_passes_through(adapter): + """Severity from the stored event passes through unchanged.""" + for sev in ["routine", "priority", "immediate"]: + event = adapter.to_event(make_usgs_event(severity=sev, flood_status="Minor Flood")) + assert event is not None + assert event.severity == sev + + +# ============================================================ +# GROUP KEY / INHIBIT KEY TESTS +# ============================================================ + +def test_group_key_is_site_param(adapter): + """Group key is the stable {site_id}_{param} key.""" + event = adapter.to_event(make_usgs_event(site_id="13090500", param_type="height")) + assert event is not None + assert event.group_key == "13090500_height" + + +def test_inhibit_keys_match_group_key(adapter): + """The sole inhibit key equals the group key (Inhibitor does severity tiering).""" + event = adapter.to_event(make_usgs_event()) + assert event is not None + assert event.inhibit_keys == [event.group_key] + + +def test_flow_and_height_distinct_keys(adapter): + """Flow and height for the same site get distinct group keys.""" + e_h = adapter.to_event(make_usgs_event(param_type="height")) + e_f = adapter.to_event(make_usgs_event(param_type="flow")) + assert e_h.group_key != e_f.group_key + + +# ============================================================ +# CONTENT / FIELD POPULATION TESTS +# ============================================================ + +def test_populates_core_fields(adapter): + """Core Event fields are populated from the stored dict.""" + evt = make_usgs_event(lat=42.61, lon=-114.48) + event = adapter.to_event(evt) + assert event is not None + assert event.source == "usgs" + assert event.lat == 42.61 + assert event.lon == -114.48 + assert event.expires == evt["expires"] + assert event.timestamp == evt["fetched_at"] + assert event.id # auto-computed + + +def test_summary_includes_value_and_status(adapter): + """Summary includes the reading value and flood status.""" + event = adapter.to_event(make_usgs_event(value=11.2, unit="ft", flood_status="Moderate Flood")) + assert event is not None + assert "11.2 ft" in event.summary + assert "Moderate Flood" in event.summary + + +# ============================================================ +# DEFENSIVE / NON-EMIT TESTS +# ============================================================ + +def test_routine_reading_returns_none(adapter): + """A routine reading (no flood_status) is not emitted.""" + assert adapter.to_event(make_usgs_event(flood_status=None, severity="routine")) is None + + +def test_missing_coords_returns_none(adapter): + """Missing coordinates returns None.""" + evt = make_usgs_event() + evt["lat"] = None + assert adapter.to_event(evt) is None + + +def test_missing_event_id_returns_none(adapter): + """Missing event_id returns None (no stable group key).""" + evt = make_usgs_event() + evt["event_id"] = None + assert adapter.to_event(evt) is None + + +def test_missing_properties_returns_none(adapter): + """No properties dict means no flood_status, so no emit.""" + evt = { + "source": "usgs", + "event_id": "13090500_height", + "severity": "routine", + "headline": "x", + "lat": 42.6, + "lon": -114.47, + "fetched_at": time.time(), + } + assert adapter.to_event(evt) is None + + +def test_does_not_raise_on_corrupted_dict(adapter): + """Corrupted dict returns None without raising.""" + assert adapter.to_event({"garbage": True}) is None