From 95dc938c2a998f2a106e25cf9526bf43ce4e92ea Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Fri, 15 May 2026 04:47:31 +0000 Subject: [PATCH] feat(notifications): Phase 2.6 NWS adapter pipeline integration Wires the NWS adapter to the new notification pipeline via EventBus: - Added fine-grained weather categories: weather_watch, weather_advisory, weather_statement (all routine severity) alongside existing weather_warning - NWSAlertsAdapter._derive_category() maps NWS event type suffix to category: "Warning" -> weather_warning, "Watch" -> weather_watch, etc. - NWSAlertsAdapter.to_event() converts internal event dict to pipeline Event with proper group_key (event_id) and inhibit_keys (Warning suppresses Watch) - EnvironmentalStore accepts optional event_bus parameter - EnvironmentalStore._ingest() emits new events to bus via _emit_event() - 22 new tests in test_adapter_nws.py covering category derivation, severity mapping, and Event field population All 119 tests pass. Co-Authored-By: Claude Opus 4.5 --- meshai/env/nws.py | 69 ++++++- meshai/env/store.py | 32 +++- meshai/notifications/categories.py | 25 ++- tests/test_adapter_nws.py | 277 +++++++++++++++++++++++++++++ 4 files changed, 396 insertions(+), 7 deletions(-) create mode 100644 tests/test_adapter_nws.py diff --git a/meshai/env/nws.py b/meshai/env/nws.py index af37a84..041dc47 100644 --- a/meshai/env/nws.py +++ b/meshai/env/nws.py @@ -4,10 +4,12 @@ import json import logging import time from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen +from meshai.notifications.events import Event, make_event + if TYPE_CHECKING: from ..config import NWSConfig @@ -39,6 +41,71 @@ class NWSAlertsAdapter: else: # moderate, minor, unknown return "routine" + def _derive_category(self, event_type: str) -> str: + """Derive notification category from NWS event type suffix. + + NWS event types like "Red Flag Warning", "Winter Storm Watch", + "Wind Advisory" map to our fine-grained weather categories. + + Args: + event_type: NWS event type string (e.g., "Tornado Warning") + + Returns: + Category key: weather_warning, weather_watch, weather_advisory, + or weather_statement + """ + event_type_lower = event_type.lower() + if event_type_lower.endswith("warning"): + return "weather_warning" + elif event_type_lower.endswith("watch"): + return "weather_watch" + elif event_type_lower.endswith("advisory"): + return "weather_advisory" + else: + # Covers "Special Weather Statement", "Short Term Forecast", etc. + return "weather_statement" + + def to_event(self, raw: dict) -> Event: + """Convert internal event dict to pipeline Event. + + Args: + raw: Internal event dict from get_events() + + Returns: + Event instance ready for EventBus emission + """ + event_type = raw.get("event_type", "Unknown") + category = self._derive_category(event_type) + nws_severity = raw.get("severity", "unknown") + severity = self._map_nws_severity(nws_severity) + + # Build group_key for dedup: same alert ID should merge + group_key = raw.get("event_id", "") + + # Build inhibit_keys: a Warning supersedes Watch/Advisory for same hazard + inhibit_keys = [] + if category == "weather_warning": + # Warning inhibits corresponding Watch/Advisory + base = event_type.rsplit(" ", 1)[0] if " " in event_type else event_type + inhibit_keys = [f"nws:{base} Watch", f"nws:{base} Advisory"] + + return make_event( + source="nws", + category=category, + severity=severity, + title=raw.get("headline", event_type), + summary=raw.get("headline", ""), + body=raw.get("description", ""), + effective=raw.get("onset") or None, + expires=raw.get("expires") or None, + lat=raw.get("lat"), + lon=raw.get("lon"), + nws_zones=raw.get("areas", []), + group_key=group_key, + inhibit_keys=inhibit_keys, + data=raw, + ) + def tick(self) -> bool: """Execute one polling tick. diff --git a/meshai/env/store.py b/meshai/env/store.py index cb05569..a6ea2fd 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -2,10 +2,11 @@ import logging import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from ..config import EnvironmentalConfig + from ..notifications.pipeline import EventBus logger = logging.getLogger(__name__) @@ -13,9 +14,15 @@ logger = logging.getLogger(__name__) class EnvironmentalStore: """Cache and tick-driver for all environmental feed adapters.""" - def __init__(self, config: "EnvironmentalConfig", region_anchors: list = None): + def __init__( + self, + config: "EnvironmentalConfig", + region_anchors: list = None, + event_bus: Optional["EventBus"] = None, + ): self._adapters = {} # name -> adapter instance self._events = {} # (source, event_id) -> event dict + self._event_bus = event_bus # Pipeline EventBus for emission self._swpc_status = {} # Kp/SFI/scales snapshot self._ducting_status = {} # tropo ducting assessment self._mesh_zones = config.nws_zones or [] @@ -87,12 +94,29 @@ class EnvironmentalStore: self._swpc_status = adapter.get_status() # Also ingest any alert events (R-scale >= 3) for evt in adapter.get_events(): - self._events[(evt["source"], evt["event_id"])] = evt + key = (evt["source"], evt["event_id"]) + is_new = key not in self._events + self._events[key] = evt + if is_new and self._event_bus and hasattr(adapter, "to_event"): + self._emit_event(adapter, evt) elif name == "ducting": self._ducting_status = adapter.get_status() else: for evt in adapter.get_events(): - self._events[(evt["source"], evt["event_id"])] = evt + key = (evt["source"], evt["event_id"]) + is_new = key not in self._events + self._events[key] = evt + if is_new and self._event_bus and hasattr(adapter, "to_event"): + self._emit_event(adapter, evt) + + def _emit_event(self, adapter, raw_evt: dict): + """Convert raw event to pipeline Event and emit to bus.""" + try: + event = adapter.to_event(raw_evt) + self._event_bus.emit(event) + logger.debug("Emitted %s event %s to pipeline", event.source, event.id) + except Exception as e: + logger.warning("Failed to emit event to pipeline: %s", e) def _purge_expired(self): """Remove expired events.""" diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py index 7275c38..156a5e5 100644 --- a/meshai/notifications/categories.py +++ b/meshai/notifications/categories.py @@ -174,12 +174,33 @@ ALERT_CATEGORIES = { # Environmental - Weather "weather_warning": { - "name": "Severe Weather", - "description": "NWS warning or advisory affecting your mesh area", + "name": "Severe Weather Warning", + "description": "NWS Warning affecting your mesh area — highest urgency weather alert", "default_severity": "priority", "example_message": "⚠ Red Flag Warning — Twin Falls, Cassia counties. Gusty winds, low humidity. Until May 13 04:00Z", "toggle": "weather", }, + "weather_watch": { + "name": "Weather Watch", + "description": "NWS Watch affecting your mesh area — conditions favorable for hazardous weather", + "default_severity": "routine", + "example_message": "⏳ Winter Storm Watch — Wood River Valley. Heavy snow possible Thu night through Fri.", + "toggle": "weather", + }, + "weather_advisory": { + "name": "Weather Advisory", + "description": "NWS Advisory affecting your mesh area — weather may cause inconvenience", + "default_severity": "routine", + "example_message": "ℹ Wind Advisory — Magic Valley. SW winds 25-35 mph with gusts to 50 mph.", + "toggle": "weather", + }, + "weather_statement": { + "name": "Weather Statement", + "description": "NWS Special Weather Statement — general awareness, no specific hazard", + "default_severity": "routine", + "example_message": "📋 Special Weather Statement — Isolated thunderstorms possible this afternoon.", + "toggle": "weather", + }, # Environmental - Space Weather "hf_blackout": { diff --git a/tests/test_adapter_nws.py b/tests/test_adapter_nws.py new file mode 100644 index 0000000..ca60181 --- /dev/null +++ b/tests/test_adapter_nws.py @@ -0,0 +1,277 @@ +"""Tests for NWS adapter Phase 2.6 — to_event() and _derive_category().""" + +import time +from unittest.mock import MagicMock, patch + +import pytest + +from meshai.env.nws import NWSAlertsAdapter +from meshai.notifications.events import Event + + +# ============================================================ +# FIXTURES +# ============================================================ + +@pytest.fixture +def mock_config(): + """Create a mock NWSConfig.""" + config = MagicMock() + config.areas = ["ID"] + config.user_agent = "(test, test@example.com)" + config.severity_min = "moderate" + config.tick_seconds = 60 + return config + + +@pytest.fixture +def adapter(mock_config): + """Create an NWSAlertsAdapter with mocked config.""" + return NWSAlertsAdapter(mock_config) + + +# ============================================================ +# _derive_category TESTS +# ============================================================ + +def test_derive_category_warning(adapter): + """Warning suffix maps to weather_warning.""" + assert adapter._derive_category("Tornado Warning") == "weather_warning" + assert adapter._derive_category("Red Flag Warning") == "weather_warning" + assert adapter._derive_category("Winter Storm Warning") == "weather_warning" + + +def test_derive_category_watch(adapter): + """Watch suffix maps to weather_watch.""" + assert adapter._derive_category("Tornado Watch") == "weather_watch" + assert adapter._derive_category("Winter Storm Watch") == "weather_watch" + assert adapter._derive_category("Fire Weather Watch") == "weather_watch" + + +def test_derive_category_advisory(adapter): + """Advisory suffix maps to weather_advisory.""" + assert adapter._derive_category("Wind Advisory") == "weather_advisory" + assert adapter._derive_category("Heat Advisory") == "weather_advisory" + assert adapter._derive_category("Frost Advisory") == "weather_advisory" + + +def test_derive_category_statement(adapter): + """Non-standard suffixes map to weather_statement.""" + assert adapter._derive_category("Special Weather Statement") == "weather_statement" + assert adapter._derive_category("Short Term Forecast") == "weather_statement" + assert adapter._derive_category("Hazardous Weather Outlook") == "weather_statement" + + +def test_derive_category_case_insensitive(adapter): + """Category derivation is case-insensitive.""" + assert adapter._derive_category("TORNADO WARNING") == "weather_warning" + assert adapter._derive_category("winter storm watch") == "weather_watch" + assert adapter._derive_category("Wind ADVISORY") == "weather_advisory" + + +# ============================================================ +# to_event TESTS +# ============================================================ + +def test_to_event_returns_event_instance(adapter): + """to_event returns an Event instance.""" + raw = { + "source": "nws", + "event_id": "urn:oid:2.49.0.1.840.0.abc123", + "event_type": "Tornado Warning", + "severity": "extreme", + "headline": "Tornado Warning issued for Ada County", + "description": "A tornado warning has been issued...", + "onset": time.time(), + "expires": time.time() + 3600, + "areas": ["IDZ016"], + "lat": 43.615, + "lon": -116.2023, + } + event = adapter.to_event(raw) + assert isinstance(event, Event) + + +def test_to_event_sets_source_nws(adapter): + """to_event sets source to 'nws'.""" + raw = { + "source": "nws", + "event_id": "test-id", + "event_type": "Wind Advisory", + "severity": "moderate", + "headline": "Wind Advisory", + } + event = adapter.to_event(raw) + assert event.source == "nws" + + +def test_to_event_derives_category_from_event_type(adapter): + """to_event uses _derive_category for category field.""" + raw = { + "event_id": "test", + "event_type": "Winter Storm Watch", + "severity": "severe", + "headline": "Winter Storm Watch", + } + event = adapter.to_event(raw) + assert event.category == "weather_watch" + + +def test_to_event_maps_severity(adapter): + """to_event maps NWS severity to 3-level system.""" + # Extreme -> immediate + raw = {"event_id": "1", "event_type": "Tornado Warning", "severity": "extreme", "headline": "test"} + assert adapter.to_event(raw).severity == "immediate" + + # Severe -> priority + raw = {"event_id": "2", "event_type": "Tornado Warning", "severity": "severe", "headline": "test"} + assert adapter.to_event(raw).severity == "priority" + + # Moderate -> routine + raw = {"event_id": "3", "event_type": "Wind Advisory", "severity": "moderate", "headline": "test"} + assert adapter.to_event(raw).severity == "routine" + + +def test_to_event_sets_title_from_headline(adapter): + """to_event uses headline as title.""" + raw = { + "event_id": "test", + "event_type": "Heat Advisory", + "severity": "moderate", + "headline": "Heat Advisory issued for Magic Valley", + } + event = adapter.to_event(raw) + assert event.title == "Heat Advisory issued for Magic Valley" + + +def test_to_event_sets_body_from_description(adapter): + """to_event uses description as body.""" + raw = { + "event_id": "test", + "event_type": "Heat Advisory", + "severity": "moderate", + "headline": "Heat Advisory", + "description": "Dangerously hot conditions expected...", + } + event = adapter.to_event(raw) + assert event.body == "Dangerously hot conditions expected..." + + +def test_to_event_sets_effective_and_expires(adapter): + """to_event sets effective and expires from onset/expires.""" + onset = time.time() + expires = time.time() + 7200 + raw = { + "event_id": "test", + "event_type": "Wind Advisory", + "severity": "moderate", + "headline": "Wind Advisory", + "onset": onset, + "expires": expires, + } + event = adapter.to_event(raw) + assert event.effective == onset + assert event.expires == expires + + +def test_to_event_sets_lat_lon(adapter): + """to_event sets lat/lon from raw event.""" + raw = { + "event_id": "test", + "event_type": "Tornado Warning", + "severity": "extreme", + "headline": "Tornado Warning", + "lat": 43.615, + "lon": -116.2023, + } + event = adapter.to_event(raw) + assert event.lat == 43.615 + assert event.lon == -116.2023 + + +def test_to_event_sets_nws_zones(adapter): + """to_event sets nws_zones from areas.""" + raw = { + "event_id": "test", + "event_type": "Red Flag Warning", + "severity": "severe", + "headline": "Red Flag Warning", + "areas": ["IDZ016", "IDZ030", "IDZ031"], + } + event = adapter.to_event(raw) + assert event.nws_zones == ["IDZ016", "IDZ030", "IDZ031"] + + +def test_to_event_sets_group_key_from_event_id(adapter): + """to_event sets group_key to event_id for dedup.""" + raw = { + "event_id": "urn:oid:2.49.0.1.840.0.abc123", + "event_type": "Tornado Warning", + "severity": "extreme", + "headline": "Tornado Warning", + } + event = adapter.to_event(raw) + assert event.group_key == "urn:oid:2.49.0.1.840.0.abc123" + + +def test_to_event_warning_sets_inhibit_keys(adapter): + """Warnings set inhibit_keys for corresponding Watch/Advisory.""" + raw = { + "event_id": "test", + "event_type": "Winter Storm Warning", + "severity": "severe", + "headline": "Winter Storm Warning", + } + event = adapter.to_event(raw) + assert "nws:Winter Storm Watch" in event.inhibit_keys + assert "nws:Winter Storm Advisory" in event.inhibit_keys + + +def test_to_event_watch_no_inhibit_keys(adapter): + """Watches do not set inhibit_keys.""" + raw = { + "event_id": "test", + "event_type": "Tornado Watch", + "severity": "moderate", + "headline": "Tornado Watch", + } + event = adapter.to_event(raw) + assert event.inhibit_keys == [] + + +def test_to_event_preserves_raw_in_data(adapter): + """to_event preserves raw event dict in data field.""" + raw = { + "event_id": "test-123", + "event_type": "Wind Advisory", + "severity": "moderate", + "headline": "Wind Advisory", + "custom_field": "custom_value", + } + event = adapter.to_event(raw) + assert event.data == raw + assert event.data["custom_field"] == "custom_value" + + +# ============================================================ +# INTEGRATION: _map_nws_severity +# ============================================================ + +def test_map_nws_severity_extreme_to_immediate(adapter): + """Extreme NWS severity maps to immediate.""" + assert adapter._map_nws_severity("extreme") == "immediate" + + +def test_map_nws_severity_severe_to_priority(adapter): + """Severe NWS severity maps to priority.""" + assert adapter._map_nws_severity("severe") == "priority" + + +def test_map_nws_severity_moderate_to_routine(adapter): + """Moderate NWS severity maps to routine.""" + assert adapter._map_nws_severity("moderate") == "routine" + + +def test_map_nws_severity_minor_to_routine(adapter): + """Minor NWS severity maps to routine.""" + assert adapter._map_nws_severity("minor") == "routine"