feat(notifications): Phase 2.9 usgs water adapter pipeline integration

Adds USGSStreamsAdapter.to_event(), wiring the USGS Water Services stream
gauge adapter into the notification EventBus, following the Phase 2.7
traffic pattern.

to_event() design (emit only actionable/elevated readings):
- Category from flood_status: an exceeded stage (Minor/Moderate/Major
  Flood) -> stream_flood_warning; "Action Stage" (approaching) ->
  stream_high_water.
- A routine reading has no flood_status and is intentionally NOT emitted
  (returns None) -- the two categories are both flood-specific and routine
  gauge chatter is not actionable. This matches the spec ("category ...
  based on flood_status").
- Severity: passed through unchanged from the adapter's NWPS-stage logic
  (action->routine, minor/moderate->priority, major->immediate).
- Summary: reading value/unit + flood status.
- group_key/inhibit_keys: a single stable {site_id}_{param} key (the
  adapter's own event_id) as both. Re-polls coalesce; severity tiering is
  delegated to the pipeline Inhibitor (no severity encoded in the key).
- Defensive: missing lat/lon or event_id returns None; try/except-guarded.

store fix (meshai/env/store.py): _emit_event now skips a None return from
to_event() instead of passing it to bus.emit(). Required because usgs
returns None for the common (routine) reading; also retroactively protects
the defensive None returns of the FIRMS/traffic/roads511 adapters, which
previously would have logged a spurious "Failed to emit" warning.

Rule 17: no new tunable. usgs sites / tick_seconds / flood_thresholds
already exist in env_feeds.yaml (GUI-editable). Open API, no key, no .env
entry. Rule 16: standalone path validated end-to-end below.

Tests: tests/test_adapter_usgs.py (13 tests) mirrors test_adapter_traffic
-- category split (flood vs action), severity pass-through,
group_key/inhibit_keys, field population, and the non-emit/defensive cases
(routine -> None, missing lat/lon -> None, missing event_id -> None,
missing properties -> None, corrupted -> None). Full suite: 174 passed.

Live smoke test (prod, sites 13090500 Snake R nr Twin Falls, 13092747 Rock
Creek at Twin Falls, 13108150 Salmon Falls Creek nr Hagerman): clean
startup, 7 env adapters loaded, no traceback. "USGS streams updated: 6
readings from 3 sites" with NWPS flood stages resolved for all 3 -- fetch
succeeds over the open API with no DNS/auth errors (Phase 2.6.6 DNS fix).
All gauges currently below action stage, so flood_status is None and
to_event correctly emits nothing; the new None-guard skipped all 6 with no
error log. The emission path (elevated -> stream_flood_warning /
stream_high_water) is unit-validated and is the same store->bus path
emitting live for NWS (weather_warning/statement) and traffic
(traffic_congestion).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-27 21:58:13 +00:00
commit 4feb6a1895
3 changed files with 267 additions and 0 deletions

2
meshai/env/store.py vendored
View file

@ -113,6 +113,8 @@ class EnvironmentalStore:
"""Convert raw event to pipeline Event and emit to bus."""
try:
event = adapter.to_event(raw_evt)
if event is None:
return # adapter declined to emit (non-actionable reading)
self._event_bus.emit(event)
logger.info(
"Emitted %s event %s (%s) to pipeline bus",

72
meshai/env/usgs.py vendored
View file

@ -13,6 +13,8 @@ from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
from meshai.notifications.events import Event, make_event
if TYPE_CHECKING:
from ..config import USGSConfig
@ -435,6 +437,76 @@ class USGSStreamsAdapter:
return changed
def to_event(self, evt: dict) -> Optional["Event"]:
"""Translate a stored USGS gauge reading into a pipeline Event.
Only elevated readings are emitted: the category is chosen from
flood_status, so a routine (below-action-stage) reading -- which has
no flood_status -- is intentionally NOT emitted (returns None).
Args:
evt: Internal event dict from get_events()
Returns:
Event instance ready for EventBus emission, or None if the dict
is missing lat/lon or event_id, or the reading is not elevated.
"""
try:
lat = evt.get("lat")
lon = evt.get("lon")
if lat is None or lon is None:
return None # Can't make a useful Event without coords
event_id = evt.get("event_id")
if not event_id:
return None # No stable identity to group/inhibit on
props = evt.get("properties", {}) or {}
flood_status = props.get("flood_status")
if not flood_status:
return None # routine reading -- not actionable, do not emit
# Category from flood_status: an exceeded stage is a flood warning;
# "Action Stage" (approaching) is high water.
if "Flood" in str(flood_status):
category = "stream_flood_warning"
else: # "Action Stage"
category = "stream_high_water"
severity = evt.get("severity", "routine")
title = evt.get("headline", "") or props.get("site_name") or "Stream Gauge"
# Summary: reading value/unit and the flood status
summary_parts = [title]
value = props.get("value")
unit = props.get("unit")
if value is not None:
summary_parts.append(f"{value} {unit}".strip())
summary_parts.append(str(flood_status))
summary = " | ".join(summary_parts)[:300]
# event_id is already the stable "{site_id}_{param}" key. Re-polls of
# the same gauge/parameter coalesce on this group_key; using it as the
# sole inhibit_key lets the pipeline Inhibitor suppress lower-severity
# re-emissions while a higher-severity one is active (severity tiering
# delegated to the Inhibitor).
return make_event(
source="usgs",
category=category,
severity=severity,
title=title,
summary=summary,
timestamp=evt.get("fetched_at"),
expires=evt.get("expires"),
lat=lat,
lon=lon,
group_key=event_id,
inhibit_keys=[event_id],
)
except Exception:
logger.exception(f"USGS to_event failed for evt: {evt.get('event_id')}")
return None
def get_events(self) -> list:
"""Get current stream gauge events."""
return self._events