diff --git a/meshai/env/swpc.py b/meshai/env/swpc.py index cf87cf8..494f5a0 100644 --- a/meshai/env/swpc.py +++ b/meshai/env/swpc.py @@ -3,10 +3,12 @@ import json import logging import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen +from meshai.notifications.events import Event, make_event + if TYPE_CHECKING: from ..config import SWPCConfig @@ -235,22 +237,108 @@ class SWPCAdapter: pass def _update_events(self): - """Generate events for significant space weather conditions.""" - # Generate events for R-scale >= 3 (radio blackout) + """Generate events for active space weather conditions (R/S/G scales). + + One event per active NOAA scale (Radio blackout / Solar radiation / + Geomagnetic storm) at level >= 1. The event_id is a STABLE + "swpc_{scale}{level}" key (no timestamp), so a sustained condition + dedups across ticks and only an escalation to a new level re-notifies. + """ self._events = [] - r_scale = self._status.get("r_scale", 0) - if r_scale >= 3: + now = time.time() + + scale_defs = [ + ("r", "r_scale", "Radio Blackout"), + ("s", "s_scale", "Solar Radiation Storm"), + ("g", "g_scale", "Geomagnetic Storm"), + ] + + for code, status_key, label in scale_defs: + level = self._status.get(status_key, 0) or 0 + if level < 1: + continue + + if level >= 5: + severity = "immediate" + elif level >= 3: + severity = "priority" + else: + severity = "routine" + + scale_letter = code.upper() self._events.append({ "source": "swpc", - "event_id": f"swpc_r{r_scale}_{int(time.time())}", - "event_type": f"R{r_scale} Radio Blackout", - "severity": "priority" if r_scale >= 3 else "routine", - "headline": f"R{r_scale} Radio Blackout in progress", - "expires": time.time() + 3600, # 1hr TTL + "event_id": f"swpc_{code}{level}", # STABLE: scale+level, no timestamp + "event_type": f"{scale_letter}{level} {label}", + "scale": scale_letter, + "level": level, + "severity": severity, + "headline": f"{scale_letter}{level} {label} in progress", + "expires": now + 3600, # 1hr TTL "areas": [], - "fetched_at": time.time(), + "fetched_at": now, }) + def to_event(self, evt: dict) -> Optional["Event"]: + """Translate a stored SWPC scale condition into a pipeline Event. + + Category is chosen from the NOAA scale; severity (level-tiered) is + passed through unchanged. SWPC conditions are global, so the Event + carries no lat/lon and is tagged region="global". + + Args: + evt: Internal event dict from get_events() + + Returns: + Event instance ready for EventBus emission, or None if the dict is + missing its scale/level (or level < 1) or event_id. + """ + try: + scale = evt.get("scale") + if not scale: + return None # No scale discriminator + + level = evt.get("level") + if level is None or level < 1: + return None # Quiet/baseline -- not actionable + + event_id = evt.get("event_id") + if not event_id: + return None # No stable identity to group/inhibit on + + category = { + "R": "rf_propagation_alert", + "S": "solar_radiation_storm", + "G": "geomagnetic_storm", + }.get(scale) + if category is None: + return None # Unknown scale + + severity = evt.get("severity", "routine") + title = evt.get("headline") or evt.get("event_type") or f"{scale}{level} space weather" + + # event_id is the stable "swpc_{scale}{level}" key. A sustained + # condition coalesces on this group_key (re-polls dedup); an + # escalation to a higher level yields a new key and re-notifies. + # Single inhibit key; severity tiering delegated to the Inhibitor. + return make_event( + source="swpc", + category=category, + severity=severity, + title=title, + summary=title, + timestamp=evt.get("fetched_at"), + expires=evt.get("expires"), + lat=None, + lon=None, + region="global", + group_key=event_id, + inhibit_keys=[event_id], + ) + except Exception: + logger.exception(f"SWPC to_event failed for evt: {evt.get('event_id')}") + return None + def get_status(self) -> dict: """Get current SWPC status.""" return self._status diff --git a/tests/test_adapter_swpc.py b/tests/test_adapter_swpc.py new file mode 100644 index 0000000..6359083 --- /dev/null +++ b/tests/test_adapter_swpc.py @@ -0,0 +1,206 @@ +"""Tests for SWPC space weather adapter Phase 2.12 — to_event() + dedup fix.""" + +import time +from unittest.mock import MagicMock + +import pytest + +from meshai.env.swpc import SWPCAdapter +from meshai.notifications.events import Event + + +# ============================================================ +# FIXTURES +# ============================================================ + +@pytest.fixture +def mock_config(): + """Create a mock SWPCConfig.""" + return MagicMock() + + +@pytest.fixture +def adapter(mock_config): + """Create a SWPCAdapter with mocked config.""" + return SWPCAdapter(mock_config) + + +def make_swpc_event( + scale="G", + level=3, + severity="priority", + headline=None, +): + """Helper to create a stored SWPC event dict (mirrors _update_events).""" + now = time.time() + label = {"R": "Radio Blackout", "S": "Solar Radiation Storm", "G": "Geomagnetic Storm"}[scale] + if headline is None: + headline = f"{scale}{level} {label} in progress" + return { + "source": "swpc", + "event_id": f"swpc_{scale.lower()}{level}", + "event_type": f"{scale}{level} {label}", + "scale": scale, + "level": level, + "severity": severity, + "headline": headline, + "expires": now + 3600, + "areas": [], + "fetched_at": now, + } + + +# ============================================================ +# CATEGORY TESTS +# ============================================================ + +def test_scale_categories(adapter): + """Each NOAA scale maps to its category.""" + cases = { + "R": "rf_propagation_alert", + "S": "solar_radiation_storm", + "G": "geomagnetic_storm", + } + for scale, category in cases.items(): + event = adapter.to_event(make_swpc_event(scale=scale, level=3)) + assert event is not None + assert event.category == category + + +# ============================================================ +# SEVERITY TESTS +# ============================================================ + +def test_severity_passes_through(adapter): + """Severity from the stored event passes through unchanged.""" + for sev in ["routine", "priority", "immediate"]: + event = adapter.to_event(make_swpc_event(severity=sev)) + assert event is not None + assert event.severity == sev + + +def test_update_events_severity_tiering(adapter): + """_update_events tiers severity: 1-2 routine, 3-4 priority, 5 immediate.""" + expected = {1: "routine", 2: "routine", 3: "priority", 4: "priority", 5: "immediate"} + for level, sev in expected.items(): + adapter._status = {"g_scale": level} + adapter._update_events() + evs = adapter.get_events() + assert len(evs) == 1 + assert evs[0]["severity"] == sev + + +# ============================================================ +# DEDUP REGRESSION TEST (the bug we are fixing) +# ============================================================ + +def test_dedup_id_stable_across_ticks(adapter): + """REGRESSION: a sustained condition keeps the SAME event_id across ticks. + + The old code embedded int(time.time()) in event_id, making every tick + unique and defeating the store's (source, event_id) dedup. + """ + adapter._status = {"r_scale": 3, "s_scale": 0, "g_scale": 0} + adapter._update_events() + id1 = adapter.get_events()[0]["event_id"] + time.sleep(0.01) # wall clock advances + adapter._update_events() + id2 = adapter.get_events()[0]["event_id"] + assert id1 == id2 + assert id1 == "swpc_r3" + + +def test_event_id_changes_with_level(adapter): + """An escalation to a new level produces a new (stable) event_id.""" + adapter._status = {"g_scale": 3} + adapter._update_events() + id_g3 = adapter.get_events()[0]["event_id"] + adapter._status = {"g_scale": 5} + adapter._update_events() + id_g5 = adapter.get_events()[0]["event_id"] + assert id_g3 == "swpc_g3" + assert id_g5 == "swpc_g5" + assert id_g3 != id_g5 + + +# ============================================================ +# EMISSION SCOPE TESTS +# ============================================================ + +def test_all_three_scales_emit_when_active(adapter): + """R, S, and G each produce an event when active (level >= 1).""" + adapter._status = {"r_scale": 2, "s_scale": 1, "g_scale": 4} + adapter._update_events() + scales = {e["scale"] for e in adapter.get_events()} + assert scales == {"R", "S", "G"} + + +def test_quiet_conditions_emit_nothing(adapter): + """All scales at 0 (quiet) produce no events.""" + adapter._status = {"r_scale": 0, "s_scale": 0, "g_scale": 0} + adapter._update_events() + assert adapter.get_events() == [] + + +# ============================================================ +# GROUP KEY / INHIBIT KEY TESTS +# ============================================================ + +def test_group_key_is_event_id(adapter): + """Group key is the stable swpc_{scale}{level} key.""" + event = adapter.to_event(make_swpc_event(scale="G", level=3)) + assert event is not None + assert event.group_key == "swpc_g3" + + +def test_inhibit_keys_match_group_key(adapter): + """The sole inhibit key equals the group key (Inhibitor does severity tiering).""" + event = adapter.to_event(make_swpc_event()) + assert event is not None + assert event.inhibit_keys == [event.group_key] + + +# ============================================================ +# CONTENT / FIELD POPULATION TESTS +# ============================================================ + +def test_populates_core_fields_global(adapter): + """Core fields populate; SWPC is global so lat/lon are None, region set.""" + evt = make_swpc_event(scale="G", level=4) + event = adapter.to_event(evt) + assert event is not None + assert event.source == "swpc" + assert event.lat is None + assert event.lon is None + assert event.region == "global" + assert event.expires == evt["expires"] + assert event.timestamp == evt["fetched_at"] + assert event.id # auto-computed + + +# ============================================================ +# DEFENSIVE / NON-EMIT TESTS +# ============================================================ + +def test_missing_scale_returns_none(adapter): + """Missing scale discriminator returns None.""" + evt = make_swpc_event() + evt["scale"] = None + assert adapter.to_event(evt) is None + + +def test_level_zero_returns_none(adapter): + """A level-0 (quiet) condition returns None.""" + assert adapter.to_event(make_swpc_event(level=0, severity="routine")) is None + + +def test_missing_event_id_returns_none(adapter): + """Missing event_id returns None (no stable group key).""" + evt = make_swpc_event() + evt["event_id"] = None + assert adapter.to_event(evt) is None + + +def test_does_not_raise_on_corrupted_dict(adapter): + """Corrupted dict returns None without raising.""" + assert adapter.to_event({"garbage": True}) is None