mirror of
https://github.com/zvx-echo6/meshai.git
synced 2026-06-10 17:04:45 +02:00
feat(notifications): Phase 2.12 SWPC space weather adapter + dedup fix
Wires the NOAA SWPC adapter into the notification EventBus and fixes a
dedup bug in its event id, following the Phase 2.7-2.11 pattern.
(A) DEDUP FIX (the regression this phase guards):
before: event_id = f"swpc_r{r_scale}_{int(time.time())}"
after: event_id = f"swpc_{code}{level}" # e.g. "swpc_g3"
The old id embedded int(time.time()), so every poll produced a unique id.
The store dedups env events on (source, event_id), so each tick during a
blackout was treated as new -> re-emitted to the bus every scales poll
(300s) and accumulated phantom entries in the store. The new id is stable
per condition: a sustained storm coalesces across ticks; only an
escalation to a new level (e.g. G3 -> G4) yields a new id and re-notifies.
Re-emit suppression is the Inhibitor's job (TTL ~1800s), not the id's.
(B) _update_events expanded R-scale-only -> all three NOAA scales:
- R (Radio Blackout) -> category rf_propagation_alert
- S (Solar Radiation Storm) -> category solar_radiation_storm
- G (Geomagnetic Storm) -> category geomagnetic_storm
Emit threshold: level >= 1 (level 0 / quiet emits nothing). Severity is
tiered in _update_events and passed through by to_event:
level 1-2 -> routine, 3-4 -> priority, 5 -> immediate.
(Scope/threshold approved by Matt before applying: "R/S/G at level >= 1".)
Each event carries scale/level discriminator fields for to_event.
(C) to_event(): category from scale, severity pass-through, group_key /
inhibit_keys = the stable event_id (single key; tiering -> Inhibitor).
SWPC conditions are global, so the Event carries lat=None, lon=None and
region="global" (Event.lat/lon are Optional and Event has a region field).
Defensive: missing scale, level<1, or missing event_id -> None;
try/except-guarded.
No store.py change: store already routes swpc through to_event in _ingest
(the swpc special-case) and the Phase 2.9 None-guard handles None returns.
Rule 17: no new tunable. Rule 18 N/A -- SWPC services.swpc.noaa.gov is
keyless (no .env entry; .ref credentials has no SWPC/NOAA key, confirming
none needed). Rule 16: standalone fetch path validated in-container.
Tests: tests/test_adapter_swpc.py (14 tests) mirrors the 2.11 shape --
scale->category mapping, severity pass-through, _update_events severity
tiering (1-2/3-4/5), group_key/inhibit_keys, all-three-scales-emit,
quiet-emits-nothing, field population (lat/lon None + region global), and
defensive cases (missing scale / level 0 / missing id / corrupted -> None).
Plus two dedup regression guards: test_dedup_id_stable_across_ticks
(SAME id across two ticks of the same condition -- fails on the old code)
and test_event_id_changes_with_level (escalation yields a new id). Full
suite: 214 passed.
Live smoke test (prod container, Phase 2.12 code rebuilt in): clean
startup, 7 env adapters loaded, healthy, no traceback, no SWPC errors. An
in-container standalone fetch of the noaa-scales endpoint succeeded
(scales_fetch_ok=true, is_loaded=true, last_error=null,
consecutive_errors=0) over the open API with no DNS/auth errors (Phase
2.6.6 DNS fix). Current conditions are quiet (R0/S0/G0), so no Event is
emitted -- acceptable, and it exercises the level<1 -> no-emit path live.
The emission path (active scale -> rf_propagation_alert / geomagnetic_storm
/ solar_radiation_storm) is unit-validated and uses the same store->bus
path that emitted live for NWS, traffic, and NIFC fires.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c111211850
commit
dda8b8f96f
2 changed files with 305 additions and 11 deletions
110
meshai/env/swpc.py
vendored
110
meshai/env/swpc.py
vendored
|
|
@ -3,10 +3,12 @@
|
|||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from meshai.notifications.events import Event, make_event
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..config import SWPCConfig
|
||||
|
||||
|
|
@ -235,22 +237,108 @@ class SWPCAdapter:
|
|||
pass
|
||||
|
||||
def _update_events(self):
|
||||
"""Generate events for significant space weather conditions."""
|
||||
# Generate events for R-scale >= 3 (radio blackout)
|
||||
"""Generate events for active space weather conditions (R/S/G scales).
|
||||
|
||||
One event per active NOAA scale (Radio blackout / Solar radiation /
|
||||
Geomagnetic storm) at level >= 1. The event_id is a STABLE
|
||||
"swpc_{scale}{level}" key (no timestamp), so a sustained condition
|
||||
dedups across ticks and only an escalation to a new level re-notifies.
|
||||
"""
|
||||
self._events = []
|
||||
r_scale = self._status.get("r_scale", 0)
|
||||
if r_scale >= 3:
|
||||
now = time.time()
|
||||
|
||||
scale_defs = [
|
||||
("r", "r_scale", "Radio Blackout"),
|
||||
("s", "s_scale", "Solar Radiation Storm"),
|
||||
("g", "g_scale", "Geomagnetic Storm"),
|
||||
]
|
||||
|
||||
for code, status_key, label in scale_defs:
|
||||
level = self._status.get(status_key, 0) or 0
|
||||
if level < 1:
|
||||
continue
|
||||
|
||||
if level >= 5:
|
||||
severity = "immediate"
|
||||
elif level >= 3:
|
||||
severity = "priority"
|
||||
else:
|
||||
severity = "routine"
|
||||
|
||||
scale_letter = code.upper()
|
||||
self._events.append({
|
||||
"source": "swpc",
|
||||
"event_id": f"swpc_r{r_scale}_{int(time.time())}",
|
||||
"event_type": f"R{r_scale} Radio Blackout",
|
||||
"severity": "priority" if r_scale >= 3 else "routine",
|
||||
"headline": f"R{r_scale} Radio Blackout in progress",
|
||||
"expires": time.time() + 3600, # 1hr TTL
|
||||
"event_id": f"swpc_{code}{level}", # STABLE: scale+level, no timestamp
|
||||
"event_type": f"{scale_letter}{level} {label}",
|
||||
"scale": scale_letter,
|
||||
"level": level,
|
||||
"severity": severity,
|
||||
"headline": f"{scale_letter}{level} {label} in progress",
|
||||
"expires": now + 3600, # 1hr TTL
|
||||
"areas": [],
|
||||
"fetched_at": time.time(),
|
||||
"fetched_at": now,
|
||||
})
|
||||
|
||||
def to_event(self, evt: dict) -> Optional["Event"]:
|
||||
"""Translate a stored SWPC scale condition into a pipeline Event.
|
||||
|
||||
Category is chosen from the NOAA scale; severity (level-tiered) is
|
||||
passed through unchanged. SWPC conditions are global, so the Event
|
||||
carries no lat/lon and is tagged region="global".
|
||||
|
||||
Args:
|
||||
evt: Internal event dict from get_events()
|
||||
|
||||
Returns:
|
||||
Event instance ready for EventBus emission, or None if the dict is
|
||||
missing its scale/level (or level < 1) or event_id.
|
||||
"""
|
||||
try:
|
||||
scale = evt.get("scale")
|
||||
if not scale:
|
||||
return None # No scale discriminator
|
||||
|
||||
level = evt.get("level")
|
||||
if level is None or level < 1:
|
||||
return None # Quiet/baseline -- not actionable
|
||||
|
||||
event_id = evt.get("event_id")
|
||||
if not event_id:
|
||||
return None # No stable identity to group/inhibit on
|
||||
|
||||
category = {
|
||||
"R": "rf_propagation_alert",
|
||||
"S": "solar_radiation_storm",
|
||||
"G": "geomagnetic_storm",
|
||||
}.get(scale)
|
||||
if category is None:
|
||||
return None # Unknown scale
|
||||
|
||||
severity = evt.get("severity", "routine")
|
||||
title = evt.get("headline") or evt.get("event_type") or f"{scale}{level} space weather"
|
||||
|
||||
# event_id is the stable "swpc_{scale}{level}" key. A sustained
|
||||
# condition coalesces on this group_key (re-polls dedup); an
|
||||
# escalation to a higher level yields a new key and re-notifies.
|
||||
# Single inhibit key; severity tiering delegated to the Inhibitor.
|
||||
return make_event(
|
||||
source="swpc",
|
||||
category=category,
|
||||
severity=severity,
|
||||
title=title,
|
||||
summary=title,
|
||||
timestamp=evt.get("fetched_at"),
|
||||
expires=evt.get("expires"),
|
||||
lat=None,
|
||||
lon=None,
|
||||
region="global",
|
||||
group_key=event_id,
|
||||
inhibit_keys=[event_id],
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(f"SWPC to_event failed for evt: {evt.get('event_id')}")
|
||||
return None
|
||||
|
||||
def get_status(self) -> dict:
|
||||
"""Get current SWPC status."""
|
||||
return self._status
|
||||
|
|
|
|||
206
tests/test_adapter_swpc.py
Normal file
206
tests/test_adapter_swpc.py
Normal file
|
|
@ -0,0 +1,206 @@
|
|||
"""Tests for SWPC space weather adapter Phase 2.12 — to_event() + dedup fix."""
|
||||
|
||||
import time
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from meshai.env.swpc import SWPCAdapter
|
||||
from meshai.notifications.events import Event
|
||||
|
||||
|
||||
# ============================================================
|
||||
# FIXTURES
|
||||
# ============================================================
|
||||
|
||||
@pytest.fixture
|
||||
def mock_config():
|
||||
"""Create a mock SWPCConfig."""
|
||||
return MagicMock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def adapter(mock_config):
|
||||
"""Create a SWPCAdapter with mocked config."""
|
||||
return SWPCAdapter(mock_config)
|
||||
|
||||
|
||||
def make_swpc_event(
|
||||
scale="G",
|
||||
level=3,
|
||||
severity="priority",
|
||||
headline=None,
|
||||
):
|
||||
"""Helper to create a stored SWPC event dict (mirrors _update_events)."""
|
||||
now = time.time()
|
||||
label = {"R": "Radio Blackout", "S": "Solar Radiation Storm", "G": "Geomagnetic Storm"}[scale]
|
||||
if headline is None:
|
||||
headline = f"{scale}{level} {label} in progress"
|
||||
return {
|
||||
"source": "swpc",
|
||||
"event_id": f"swpc_{scale.lower()}{level}",
|
||||
"event_type": f"{scale}{level} {label}",
|
||||
"scale": scale,
|
||||
"level": level,
|
||||
"severity": severity,
|
||||
"headline": headline,
|
||||
"expires": now + 3600,
|
||||
"areas": [],
|
||||
"fetched_at": now,
|
||||
}
|
||||
|
||||
|
||||
# ============================================================
|
||||
# CATEGORY TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_scale_categories(adapter):
|
||||
"""Each NOAA scale maps to its category."""
|
||||
cases = {
|
||||
"R": "rf_propagation_alert",
|
||||
"S": "solar_radiation_storm",
|
||||
"G": "geomagnetic_storm",
|
||||
}
|
||||
for scale, category in cases.items():
|
||||
event = adapter.to_event(make_swpc_event(scale=scale, level=3))
|
||||
assert event is not None
|
||||
assert event.category == category
|
||||
|
||||
|
||||
# ============================================================
|
||||
# SEVERITY TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_severity_passes_through(adapter):
|
||||
"""Severity from the stored event passes through unchanged."""
|
||||
for sev in ["routine", "priority", "immediate"]:
|
||||
event = adapter.to_event(make_swpc_event(severity=sev))
|
||||
assert event is not None
|
||||
assert event.severity == sev
|
||||
|
||||
|
||||
def test_update_events_severity_tiering(adapter):
|
||||
"""_update_events tiers severity: 1-2 routine, 3-4 priority, 5 immediate."""
|
||||
expected = {1: "routine", 2: "routine", 3: "priority", 4: "priority", 5: "immediate"}
|
||||
for level, sev in expected.items():
|
||||
adapter._status = {"g_scale": level}
|
||||
adapter._update_events()
|
||||
evs = adapter.get_events()
|
||||
assert len(evs) == 1
|
||||
assert evs[0]["severity"] == sev
|
||||
|
||||
|
||||
# ============================================================
|
||||
# DEDUP REGRESSION TEST (the bug we are fixing)
|
||||
# ============================================================
|
||||
|
||||
def test_dedup_id_stable_across_ticks(adapter):
|
||||
"""REGRESSION: a sustained condition keeps the SAME event_id across ticks.
|
||||
|
||||
The old code embedded int(time.time()) in event_id, making every tick
|
||||
unique and defeating the store's (source, event_id) dedup.
|
||||
"""
|
||||
adapter._status = {"r_scale": 3, "s_scale": 0, "g_scale": 0}
|
||||
adapter._update_events()
|
||||
id1 = adapter.get_events()[0]["event_id"]
|
||||
time.sleep(0.01) # wall clock advances
|
||||
adapter._update_events()
|
||||
id2 = adapter.get_events()[0]["event_id"]
|
||||
assert id1 == id2
|
||||
assert id1 == "swpc_r3"
|
||||
|
||||
|
||||
def test_event_id_changes_with_level(adapter):
|
||||
"""An escalation to a new level produces a new (stable) event_id."""
|
||||
adapter._status = {"g_scale": 3}
|
||||
adapter._update_events()
|
||||
id_g3 = adapter.get_events()[0]["event_id"]
|
||||
adapter._status = {"g_scale": 5}
|
||||
adapter._update_events()
|
||||
id_g5 = adapter.get_events()[0]["event_id"]
|
||||
assert id_g3 == "swpc_g3"
|
||||
assert id_g5 == "swpc_g5"
|
||||
assert id_g3 != id_g5
|
||||
|
||||
|
||||
# ============================================================
|
||||
# EMISSION SCOPE TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_all_three_scales_emit_when_active(adapter):
|
||||
"""R, S, and G each produce an event when active (level >= 1)."""
|
||||
adapter._status = {"r_scale": 2, "s_scale": 1, "g_scale": 4}
|
||||
adapter._update_events()
|
||||
scales = {e["scale"] for e in adapter.get_events()}
|
||||
assert scales == {"R", "S", "G"}
|
||||
|
||||
|
||||
def test_quiet_conditions_emit_nothing(adapter):
|
||||
"""All scales at 0 (quiet) produce no events."""
|
||||
adapter._status = {"r_scale": 0, "s_scale": 0, "g_scale": 0}
|
||||
adapter._update_events()
|
||||
assert adapter.get_events() == []
|
||||
|
||||
|
||||
# ============================================================
|
||||
# GROUP KEY / INHIBIT KEY TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_group_key_is_event_id(adapter):
|
||||
"""Group key is the stable swpc_{scale}{level} key."""
|
||||
event = adapter.to_event(make_swpc_event(scale="G", level=3))
|
||||
assert event is not None
|
||||
assert event.group_key == "swpc_g3"
|
||||
|
||||
|
||||
def test_inhibit_keys_match_group_key(adapter):
|
||||
"""The sole inhibit key equals the group key (Inhibitor does severity tiering)."""
|
||||
event = adapter.to_event(make_swpc_event())
|
||||
assert event is not None
|
||||
assert event.inhibit_keys == [event.group_key]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# CONTENT / FIELD POPULATION TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_populates_core_fields_global(adapter):
|
||||
"""Core fields populate; SWPC is global so lat/lon are None, region set."""
|
||||
evt = make_swpc_event(scale="G", level=4)
|
||||
event = adapter.to_event(evt)
|
||||
assert event is not None
|
||||
assert event.source == "swpc"
|
||||
assert event.lat is None
|
||||
assert event.lon is None
|
||||
assert event.region == "global"
|
||||
assert event.expires == evt["expires"]
|
||||
assert event.timestamp == evt["fetched_at"]
|
||||
assert event.id # auto-computed
|
||||
|
||||
|
||||
# ============================================================
|
||||
# DEFENSIVE / NON-EMIT TESTS
|
||||
# ============================================================
|
||||
|
||||
def test_missing_scale_returns_none(adapter):
|
||||
"""Missing scale discriminator returns None."""
|
||||
evt = make_swpc_event()
|
||||
evt["scale"] = None
|
||||
assert adapter.to_event(evt) is None
|
||||
|
||||
|
||||
def test_level_zero_returns_none(adapter):
|
||||
"""A level-0 (quiet) condition returns None."""
|
||||
assert adapter.to_event(make_swpc_event(level=0, severity="routine")) is None
|
||||
|
||||
|
||||
def test_missing_event_id_returns_none(adapter):
|
||||
"""Missing event_id returns None (no stable group key)."""
|
||||
evt = make_swpc_event()
|
||||
evt["event_id"] = None
|
||||
assert adapter.to_event(evt) is None
|
||||
|
||||
|
||||
def test_does_not_raise_on_corrupted_dict(adapter):
|
||||
"""Corrupted dict returns None without raising."""
|
||||
assert adapter.to_event({"garbage": True}) is None
|
||||
Loading…
Add table
Add a link
Reference in a new issue