feat(notifications): Phase 2.13 ducting adapter threshold-crossing emission (severity-tiered, Option C)

Adds a tier-based threshold-crossing emission path to the tropospheric
ducting adapter, which was status-only until now.

EMISSION PATH (before -> after):
  before: DuctingAdapter had only get_status(); store._ingest's ducting
          branch did `self._ducting_status = adapter.get_status()` and
          emitted NOTHING -- no get_events(), no to_event(), event_count
          hardcoded 0.
  after:  the adapter derives a propagation TIER each tick (with
          hysteresis) and stages an event on tier change; get_events() +
          to_event() added; store._ingest's ducting branch now mirrors the
          swpc branch (dedup on (source, event_id) + _emit_event), so a
          tier change emits to the pipeline bus.

Option C design (severity-tiered by enhancement strength):
- Driving quantity: min M-gradient (modified refractivity gradient,
  M-units/km) the adapter already computes.
- Tiers (ascending strength): normal < super_refraction < duct <
  surface_duct.
    0 <= g < 79  -> super_refraction -> category rf_anomalous_propagation,
                    severity routine
    g < 0        -> duct (elevated)  -> category rf_ducting_enhancement,
                    severity priority
    surface_duct OR g < -100 -> strong/surface duct ->
                    category rf_ducting_enhancement, severity immediate,
                    surface flag set in the summary
    g >= 79      -> normal -> no event
- Hysteresis / anti-flap: a DEADBAND of 5 M-units (TIER_DEADBAND) on the
  two gradient boundaries (79 and 0). A tier change commits only once the
  gradient is past the boundary by the deadband, so a wiggle right at a
  threshold does not flap-trip across the 3h poll interval / 30-min
  Inhibitor TTL mismatch (the Inhibitor TTL is shorter than the poll
  interval, so anti-flap must live in the adapter). The most-severe
  surface/strong-duct tier is categorical (duct reaches the ground) and is
  intentionally NOT held back or onto by the deadband -- it fires and
  clears promptly. (Deadband = 5 M-units chosen per the 5-10 guidance.)
- Stable event_id (SWPC idiom): "ducting_{tier_code}_{lat}_{lon}", e.g.
  "ducting_duct_42.56_-114.47". A sustained tier coalesces on this
  group_key (the store dedups it); an escalation to a stronger tier yields
  a new key and re-notifies. group_key = sole inhibit_key; severity tiering
  delegated to the Inhibitor.
- Prior-state tracking: self._last_tier persists across ticks (the
  deadband needs the last committed tier); _parse_response rebuilds
  _status wholesale, so _update_events runs at the end of each parse.
- Ducting is geographic: events carry the assessment location's lat/lon
  (config.latitude/longitude). Defensive: missing/normal tier, missing
  location, or missing gradient -> None; try/except-guarded.

Rule 17: no new tunable (latitude/longitude/tick_seconds already in
env_feeds.yaml; TIER_DEADBAND is an internal constant). Rule 18 N/A --
Open-Meteo GFS (api.open-meteo.com) is keyless. Rule 16: standalone fetch
path validated in-container.

Tests: tests/test_adapter_ducting.py (19 tests) mirrors the 2.12 SWPC
shape -- tier classification (normal/super_refraction/duct/surface_duct),
severity tiering, scale->category mapping, group_key/inhibit_keys, field
population, defensive cases (normal/missing location/missing gradient/
corrupted -> None), plus regression guards: dedup id stable across
same-tier ticks, tier escalation yields a new id, and TWO deadband guards
(a sub-deadband wiggle at the 0 boundary and at the 79 boundary holds the
prior tier; surface duct is not held by the deadband). Full suite: 233
passed.

Live smoke test (prod container, Phase 2.13 code rebuilt in): clean
startup, 7 env adapters loaded (ducting already counted), healthy, no
traceback. An in-container standalone _fetch of the Open-Meteo GFS
endpoint succeeded (fetch_ok=true, is_loaded=true, last_error=null,
consecutive_errors=0) -- 3/3 repeat probes clean. The current atmosphere
is normal (min M-gradient 122.5 >= 79) so tier=normal and no Event is
emitted -- acceptable, and it exercises the no-emit path and the tier
classifier on live data. NOTE: the running container's first ducting tick
logged a transient "[SSL: UNEXPECTED_EOF_WHILE_READING]" connection error;
the immediate and repeated standalone probes all succeeded, so this was a
transient upstream TLS drop (not DNS/auth/config) and the adapter degrades
gracefully (logs, increments consecutive_errors, returns False, no crash).
The emission path (tier change -> rf_anomalous_propagation /
rf_ducting_enhancement) is unit-validated and uses the same store->bus
path that emitted live for NWS, traffic, and NIFC fires.

Follow-up (not in this change): DuctingAdapter.health_status still returns
event_count hardcoded 0; now that the adapter emits, it could report
len(self._events). Cosmetic (health endpoint only); left out to keep the
diff scoped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
K7ZVX 2026-05-28 00:01:40 +00:00
commit d3b62ad3c5
3 changed files with 427 additions and 1 deletions

187
meshai/env/ducting.py vendored
View file

@ -5,10 +5,12 @@ import logging
import math import math
import time import time
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Optional
from urllib.error import HTTPError, URLError from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
from meshai.notifications.events import Event, make_event
if TYPE_CHECKING: if TYPE_CHECKING:
from ..config import DuctingConfig from ..config import DuctingConfig
@ -36,6 +38,8 @@ class DuctingAdapter:
self._consecutive_errors = 0 self._consecutive_errors = 0
self._last_error = None self._last_error = None
self._is_loaded = False self._is_loaded = False
self._events = []
self._last_tier = None
def tick(self) -> bool: def tick(self) -> bool:
"""Execute one polling tick. """Execute one polling tick.
@ -256,6 +260,187 @@ class DuctingAdapter:
}, },
} }
self._update_events()
# Tier model (enhancement strength, ascending):
# normal < super_refraction < duct < surface_duct
_TIER_ORDER = ["normal", "super_refraction", "duct", "surface_duct"]
# Gradient (M-units/km) at the top (more-enhanced side) of each tier:
# normal/super_refraction = 79 ; super_refraction/duct = 0 ; duct/surface_duct = -100
_TIER_TOP_BOUNDARY = {0: 79.0, 1: 0.0, 2: -100.0}
TIER_DEADBAND = 5.0 # M-units of hysteresis on gradient boundaries (anti-flap)
_TIER_CATEGORY = {
"super_refraction": "rf_anomalous_propagation",
"duct": "rf_ducting_enhancement",
"surface_duct": "rf_ducting_enhancement",
}
_TIER_SEVERITY = {
"super_refraction": "routine",
"duct": "priority",
"surface_duct": "immediate",
}
_TIER_CODE = {
"super_refraction": "superrefraction",
"duct": "duct",
"surface_duct": "surfaceduct",
}
_TIER_LABEL = {
"super_refraction": "Super-refraction (enhanced range)",
"duct": "Tropospheric ducting (elevated)",
"surface_duct": "Tropospheric ducting (surface)",
}
def _raw_tier(self, min_gradient, condition) -> str:
"""Map the current min M-gradient + condition to a tier (no hysteresis)."""
if min_gradient is None or min_gradient >= 79:
return "normal"
if min_gradient >= 0:
return "super_refraction"
# min_gradient < 0 -> ducting; strongest when it reaches the surface or is very steep
if condition == "surface_duct" or min_gradient < -100:
return "surface_duct"
return "duct"
def _apply_hysteresis(self, raw_tier, min_gradient, last_tier) -> str:
"""Commit a tier change only past a deadband, to avoid boundary flapping.
The most-severe tier (surface/strong duct) is categorical (the duct
reaches the ground) and is never held back or held onto by the
gradient deadband -- it fires promptly and clears promptly.
"""
if last_tier is None or raw_tier == last_tier or min_gradient is None:
return raw_tier
if raw_tier == "surface_duct" or last_tier == "surface_duct":
return raw_tier
idx_raw = self._TIER_ORDER.index(raw_tier)
idx_last = self._TIER_ORDER.index(last_tier)
if idx_raw > idx_last:
# toward a more-enhanced tier (gradient dropped)
boundary = self._TIER_TOP_BOUNDARY[idx_last]
return raw_tier if min_gradient <= boundary - self.TIER_DEADBAND else last_tier
# toward a less-enhanced tier (gradient rose)
boundary = self._TIER_TOP_BOUNDARY[idx_last - 1]
return raw_tier if min_gradient >= boundary + self.TIER_DEADBAND else last_tier
def _update_events(self):
"""Derive the current propagation tier (with hysteresis) and stage events.
Mirrors the SWPC pattern: get_events() returns the current notable tier
every tick with a STABLE per-tier event_id, so the store dedups a
sustained tier and only a tier change re-emits. A 'normal' tier stages
nothing.
"""
self._events = []
if not self._status:
return
min_gradient = self._status.get("min_gradient")
condition = self._status.get("condition")
raw_tier = self._raw_tier(min_gradient, condition)
tier = self._apply_hysteresis(raw_tier, min_gradient, self._last_tier)
self._last_tier = tier
self._status["tier"] = tier
if tier == "normal":
return # no enhancement -- nothing to emit
now = time.time()
loc = f"{round(self._lat, 2)}_{round(self._lon, 2)}"
label = self._TIER_LABEL[tier]
assessment = self._status.get("assessment", "")
headline = f"{label} -- {assessment}".strip(" -") if assessment else label
self._events.append({
"source": "ducting",
"event_id": f"ducting_{self._TIER_CODE[tier]}_{loc}", # STABLE per tier+location
"event_type": label,
"tier": tier,
"severity": self._TIER_SEVERITY[tier],
"headline": headline,
"min_gradient": min_gradient,
"duct_base_m": self._status.get("duct_base_m"),
"duct_thickness_m": self._status.get("duct_thickness_m"),
"surface_duct": (tier == "surface_duct"),
"lat": self._lat,
"lon": self._lon,
"expires": now + 6 * 3600, # outlast the 3h poll gap so a sustained tier dedups
"fetched_at": now,
})
def get_events(self) -> list:
"""Get current propagation-enhancement events (tier-based)."""
return self._events
def to_event(self, evt: dict) -> Optional["Event"]:
"""Translate a stored ducting tier event into a pipeline Event.
Category from tier; tier-mapped severity passed through. Ducting is
geographic, so the Event carries the assessment location's lat/lon.
The stable per-tier event_id is the group_key and sole inhibit_key;
a tier escalation yields a new key and re-notifies.
Args:
evt: Internal event dict from get_events()
Returns:
Event instance, or None for a normal/missing tier, missing
location, or missing gradient.
"""
try:
tier = evt.get("tier")
if not tier or tier == "normal":
return None # no enhancement -- not actionable
lat = evt.get("lat")
lon = evt.get("lon")
if lat is None or lon is None:
return None # missing location
if evt.get("min_gradient") is None:
return None # missing the driving quantity
event_id = evt.get("event_id")
if not event_id:
return None
category = self._TIER_CATEGORY.get(tier)
if category is None:
return None # unknown tier
severity = evt.get("severity", "routine")
title = evt.get("event_type") or "Tropospheric propagation enhancement"
summary_parts = [title]
mg = evt.get("min_gradient")
if mg is not None:
summary_parts.append(f"min M-gradient {mg}/km")
base = evt.get("duct_base_m")
thick = evt.get("duct_thickness_m")
if base is not None and thick:
summary_parts.append(f"duct base {int(base)} m, ~{int(thick)} m thick")
if evt.get("surface_duct"):
summary_parts.append("surface duct")
summary = " | ".join(summary_parts)[:300]
return make_event(
source="ducting",
category=category,
severity=severity,
title=title,
summary=summary,
timestamp=evt.get("fetched_at"),
expires=evt.get("expires"),
lat=lat,
lon=lon,
group_key=event_id,
inhibit_keys=[event_id],
)
except Exception:
logger.exception(f"Ducting to_event failed for evt: {evt.get('event_id')}")
return None
def get_status(self) -> dict: def get_status(self) -> dict:
"""Get current ducting status.""" """Get current ducting status."""
return self._status return self._status

6
meshai/env/store.py vendored
View file

@ -101,6 +101,12 @@ class EnvironmentalStore:
self._emit_event(adapter, evt) self._emit_event(adapter, evt)
elif name == "ducting": elif name == "ducting":
self._ducting_status = adapter.get_status() self._ducting_status = adapter.get_status()
for evt in adapter.get_events():
key = (evt["source"], evt["event_id"])
is_new = key not in self._events
self._events[key] = evt
if is_new and self._event_bus and hasattr(adapter, "to_event"):
self._emit_event(adapter, evt)
else: else:
for evt in adapter.get_events(): for evt in adapter.get_events():
key = (evt["source"], evt["event_id"]) key = (evt["source"], evt["event_id"])

View file

@ -0,0 +1,235 @@
"""Tests for ducting adapter Phase 2.13 — tier-based threshold-crossing emission."""
import time
from unittest.mock import MagicMock
import pytest
from meshai.env.ducting import DuctingAdapter
from meshai.notifications.events import Event
# ============================================================
# FIXTURES / HELPERS
# ============================================================
def fresh_adapter():
"""A DuctingAdapter with the Twin Falls default location."""
cfg = MagicMock()
cfg.latitude = 42.56
cfg.longitude = -114.47
cfg.tick_seconds = 10800
return DuctingAdapter(cfg)
@pytest.fixture
def adapter():
return fresh_adapter()
def set_status(adapter, min_gradient, condition="normal", assessment="", base=None, thick=None):
"""Drive _update_events from a synthetic status blob (mirrors _parse_response)."""
adapter._status = {
"min_gradient": min_gradient,
"condition": condition,
"assessment": assessment,
"duct_base_m": base,
"duct_thickness_m": thick,
}
adapter._update_events()
return adapter.get_events()
def make_ducting_event(
tier="duct",
min_gradient=-30,
severity="priority",
lat=42.56,
lon=-114.47,
surface=False,
base=1500,
thick=300,
):
"""A stored ducting tier event dict (mirrors _update_events output)."""
code = {"super_refraction": "superrefraction", "duct": "duct", "surface_duct": "surfaceduct"}[tier]
label = {
"super_refraction": "Super-refraction (enhanced range)",
"duct": "Tropospheric ducting (elevated)",
"surface_duct": "Tropospheric ducting (surface)",
}[tier]
now = time.time()
return {
"source": "ducting",
"event_id": f"ducting_{code}_{round(lat, 2)}_{round(lon, 2)}",
"event_type": label,
"tier": tier,
"severity": severity,
"headline": label,
"min_gradient": min_gradient,
"duct_base_m": base,
"duct_thickness_m": thick,
"surface_duct": surface,
"lat": lat,
"lon": lon,
"expires": now + 6 * 3600,
"fetched_at": now,
}
# ============================================================
# TIER CLASSIFICATION TESTS
# ============================================================
def test_normal_emits_nothing(adapter):
"""A normal atmosphere (gradient >= 79) stages no event."""
evs = set_status(adapter, 118, "normal")
assert evs == []
assert adapter._last_tier == "normal"
def test_super_refraction_tier(adapter):
"""0 <= gradient < 79 is super-refraction (routine)."""
evs = set_status(adapter, 40, "super_refraction")
assert len(evs) == 1
assert evs[0]["tier"] == "super_refraction"
assert evs[0]["severity"] == "routine"
def test_duct_tier(adapter):
"""gradient < 0 (elevated) is ducting (priority)."""
evs = set_status(adapter, -30, "elevated_duct")
assert evs[0]["tier"] == "duct"
assert evs[0]["severity"] == "priority"
def test_surface_duct_tier(adapter):
"""A surface duct condition is the strong/surface tier (immediate)."""
evs = set_status(adapter, -30, "surface_duct")
assert evs[0]["tier"] == "surface_duct"
assert evs[0]["severity"] == "immediate"
assert evs[0]["surface_duct"] is True
def test_steep_gradient_is_surface_duct(adapter):
"""A very steep gradient (< -100) escalates to the surface/strong tier."""
evs = set_status(adapter, -150, "elevated_duct")
assert evs[0]["tier"] == "surface_duct"
def test_update_events_severity_tiering():
"""Fresh-adapter severity per tier: routine / priority / immediate."""
cases = [(40, "super_refraction", "routine"), (-30, "elevated_duct", "priority"), (-30, "surface_duct", "immediate")]
for g, cond, sev in cases:
a = fresh_adapter()
assert set_status(a, g, cond)[0]["severity"] == sev
# ============================================================
# DEDUP / ESCALATION / DEADBAND (regression guards)
# ============================================================
def test_dedup_id_stable_across_ticks(adapter):
"""REGRESSION: a sustained tier keeps the SAME event_id across ticks."""
e1 = set_status(adapter, -30, "elevated_duct")[0]["event_id"]
time.sleep(0.01)
e2 = set_status(adapter, -28, "elevated_duct")[0]["event_id"]
assert e1 == e2
assert e1 == "ducting_duct_42.56_-114.47"
def test_tier_escalation_new_event_id(adapter):
"""An escalation to a stronger tier yields a new (stable) event_id."""
e_sr = set_status(adapter, 40, "super_refraction")[0]["event_id"]
e_duct = set_status(adapter, -30, "elevated_duct")[0]["event_id"]
assert e_sr == "ducting_superrefraction_42.56_-114.47"
assert e_duct == "ducting_duct_42.56_-114.47"
assert e_sr != e_duct
def test_deadband_holds_tier_on_zero_boundary_wiggle(adapter):
"""A tiny dip just below 0 (within the 5 M-unit deadband) holds the prior tier."""
set_status(adapter, 40, "super_refraction")
assert adapter._last_tier == "super_refraction"
evs = set_status(adapter, -2, "elevated_duct") # within deadband of 0
assert adapter._last_tier == "super_refraction"
assert evs[0]["tier"] == "super_refraction"
evs2 = set_status(adapter, -8, "elevated_duct") # clearly past deadband
assert adapter._last_tier == "duct"
assert evs2[0]["tier"] == "duct"
def test_deadband_holds_normal_on_79_boundary_wiggle(adapter):
"""A dip just below 79 (within the deadband) holds normal (no event)."""
set_status(adapter, 100, "normal")
assert adapter._last_tier == "normal"
evs = set_status(adapter, 76, "super_refraction") # within deadband of 79
assert adapter._last_tier == "normal"
assert evs == []
evs2 = set_status(adapter, 70, "super_refraction") # past deadband
assert evs2[0]["tier"] == "super_refraction"
def test_surface_duct_not_held_by_deadband(adapter):
"""The categorical surface duct tier fires promptly regardless of deadband."""
set_status(adapter, 40, "super_refraction")
evs = set_status(adapter, -30, "surface_duct")
assert evs[0]["tier"] == "surface_duct"
# ============================================================
# to_event() — CATEGORY / SEVERITY / KEYS / FIELDS
# ============================================================
def test_tier_categories(adapter):
"""super-refraction -> anomalous prop; (surface_)duct -> ducting enhancement."""
assert adapter.to_event(make_ducting_event(tier="super_refraction", severity="routine")).category == "rf_anomalous_propagation"
assert adapter.to_event(make_ducting_event(tier="duct")).category == "rf_ducting_enhancement"
assert adapter.to_event(make_ducting_event(tier="surface_duct", severity="immediate", surface=True)).category == "rf_ducting_enhancement"
def test_severity_passes_through(adapter):
for sev in ["routine", "priority", "immediate"]:
assert adapter.to_event(make_ducting_event(severity=sev)).severity == sev
def test_group_key_and_inhibit_keys(adapter):
event = adapter.to_event(make_ducting_event(tier="duct"))
assert event.group_key == "ducting_duct_42.56_-114.47"
assert event.inhibit_keys == [event.group_key]
def test_populates_core_fields(adapter):
evt = make_ducting_event(tier="duct", lat=42.56, lon=-114.47)
event = adapter.to_event(evt)
assert event.source == "ducting"
assert event.lat == 42.56
assert event.lon == -114.47
assert event.expires == evt["expires"]
assert event.timestamp == evt["fetched_at"]
assert event.id # auto-computed
# ============================================================
# DEFENSIVE / NON-EMIT TESTS
# ============================================================
def test_normal_tier_returns_none(adapter):
evt = make_ducting_event()
evt["tier"] = "normal"
assert adapter.to_event(evt) is None
def test_missing_location_returns_none(adapter):
evt = make_ducting_event()
evt["lat"] = None
assert adapter.to_event(evt) is None
def test_missing_gradient_returns_none(adapter):
evt = make_ducting_event()
evt["min_gradient"] = None
assert adapter.to_event(evt) is None
def test_does_not_raise_on_corrupted_dict(adapter):
assert adapter.to_event({"garbage": True}) is None