From d3b62ad3c5cc0d0d099fed9de4f3bca6ec28b39e Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 28 May 2026 00:01:40 +0000 Subject: [PATCH] feat(notifications): Phase 2.13 ducting adapter threshold-crossing emission (severity-tiered, Option C) Adds a tier-based threshold-crossing emission path to the tropospheric ducting adapter, which was status-only until now. EMISSION PATH (before -> after): before: DuctingAdapter had only get_status(); store._ingest's ducting branch did `self._ducting_status = adapter.get_status()` and emitted NOTHING -- no get_events(), no to_event(), event_count hardcoded 0. after: the adapter derives a propagation TIER each tick (with hysteresis) and stages an event on tier change; get_events() + to_event() added; store._ingest's ducting branch now mirrors the swpc branch (dedup on (source, event_id) + _emit_event), so a tier change emits to the pipeline bus. Option C design (severity-tiered by enhancement strength): - Driving quantity: min M-gradient (modified refractivity gradient, M-units/km) the adapter already computes. - Tiers (ascending strength): normal < super_refraction < duct < surface_duct. 0 <= g < 79 -> super_refraction -> category rf_anomalous_propagation, severity routine g < 0 -> duct (elevated) -> category rf_ducting_enhancement, severity priority surface_duct OR g < -100 -> strong/surface duct -> category rf_ducting_enhancement, severity immediate, surface flag set in the summary g >= 79 -> normal -> no event - Hysteresis / anti-flap: a DEADBAND of 5 M-units (TIER_DEADBAND) on the two gradient boundaries (79 and 0). A tier change commits only once the gradient is past the boundary by the deadband, so a wiggle right at a threshold does not flap-trip across the 3h poll interval / 30-min Inhibitor TTL mismatch (the Inhibitor TTL is shorter than the poll interval, so anti-flap must live in the adapter). The most-severe surface/strong-duct tier is categorical (duct reaches the ground) and is intentionally NOT held back or onto by the deadband -- it fires and clears promptly. (Deadband = 5 M-units chosen per the 5-10 guidance.) - Stable event_id (SWPC idiom): "ducting_{tier_code}_{lat}_{lon}", e.g. "ducting_duct_42.56_-114.47". A sustained tier coalesces on this group_key (the store dedups it); an escalation to a stronger tier yields a new key and re-notifies. group_key = sole inhibit_key; severity tiering delegated to the Inhibitor. - Prior-state tracking: self._last_tier persists across ticks (the deadband needs the last committed tier); _parse_response rebuilds _status wholesale, so _update_events runs at the end of each parse. - Ducting is geographic: events carry the assessment location's lat/lon (config.latitude/longitude). Defensive: missing/normal tier, missing location, or missing gradient -> None; try/except-guarded. Rule 17: no new tunable (latitude/longitude/tick_seconds already in env_feeds.yaml; TIER_DEADBAND is an internal constant). Rule 18 N/A -- Open-Meteo GFS (api.open-meteo.com) is keyless. Rule 16: standalone fetch path validated in-container. Tests: tests/test_adapter_ducting.py (19 tests) mirrors the 2.12 SWPC shape -- tier classification (normal/super_refraction/duct/surface_duct), severity tiering, scale->category mapping, group_key/inhibit_keys, field population, defensive cases (normal/missing location/missing gradient/ corrupted -> None), plus regression guards: dedup id stable across same-tier ticks, tier escalation yields a new id, and TWO deadband guards (a sub-deadband wiggle at the 0 boundary and at the 79 boundary holds the prior tier; surface duct is not held by the deadband). Full suite: 233 passed. Live smoke test (prod container, Phase 2.13 code rebuilt in): clean startup, 7 env adapters loaded (ducting already counted), healthy, no traceback. An in-container standalone _fetch of the Open-Meteo GFS endpoint succeeded (fetch_ok=true, is_loaded=true, last_error=null, consecutive_errors=0) -- 3/3 repeat probes clean. The current atmosphere is normal (min M-gradient 122.5 >= 79) so tier=normal and no Event is emitted -- acceptable, and it exercises the no-emit path and the tier classifier on live data. NOTE: the running container's first ducting tick logged a transient "[SSL: UNEXPECTED_EOF_WHILE_READING]" connection error; the immediate and repeated standalone probes all succeeded, so this was a transient upstream TLS drop (not DNS/auth/config) and the adapter degrades gracefully (logs, increments consecutive_errors, returns False, no crash). The emission path (tier change -> rf_anomalous_propagation / rf_ducting_enhancement) is unit-validated and uses the same store->bus path that emitted live for NWS, traffic, and NIFC fires. Follow-up (not in this change): DuctingAdapter.health_status still returns event_count hardcoded 0; now that the adapter emits, it could report len(self._events). Cosmetic (health endpoint only); left out to keep the diff scoped. Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/env/ducting.py | 187 ++++++++++++++++++++++++++- meshai/env/store.py | 6 + tests/test_adapter_ducting.py | 235 ++++++++++++++++++++++++++++++++++ 3 files changed, 427 insertions(+), 1 deletion(-) create mode 100644 tests/test_adapter_ducting.py diff --git a/meshai/env/ducting.py b/meshai/env/ducting.py index b0feeff..3b40b0e 100644 --- a/meshai/env/ducting.py +++ b/meshai/env/ducting.py @@ -5,10 +5,12 @@ import logging import math import time from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen +from meshai.notifications.events import Event, make_event + if TYPE_CHECKING: from ..config import DuctingConfig @@ -36,6 +38,8 @@ class DuctingAdapter: self._consecutive_errors = 0 self._last_error = None self._is_loaded = False + self._events = [] + self._last_tier = None def tick(self) -> bool: """Execute one polling tick. @@ -256,6 +260,187 @@ class DuctingAdapter: }, } + self._update_events() + + # Tier model (enhancement strength, ascending): + # normal < super_refraction < duct < surface_duct + _TIER_ORDER = ["normal", "super_refraction", "duct", "surface_duct"] + # Gradient (M-units/km) at the top (more-enhanced side) of each tier: + # normal/super_refraction = 79 ; super_refraction/duct = 0 ; duct/surface_duct = -100 + _TIER_TOP_BOUNDARY = {0: 79.0, 1: 0.0, 2: -100.0} + TIER_DEADBAND = 5.0 # M-units of hysteresis on gradient boundaries (anti-flap) + + _TIER_CATEGORY = { + "super_refraction": "rf_anomalous_propagation", + "duct": "rf_ducting_enhancement", + "surface_duct": "rf_ducting_enhancement", + } + _TIER_SEVERITY = { + "super_refraction": "routine", + "duct": "priority", + "surface_duct": "immediate", + } + _TIER_CODE = { + "super_refraction": "superrefraction", + "duct": "duct", + "surface_duct": "surfaceduct", + } + _TIER_LABEL = { + "super_refraction": "Super-refraction (enhanced range)", + "duct": "Tropospheric ducting (elevated)", + "surface_duct": "Tropospheric ducting (surface)", + } + + def _raw_tier(self, min_gradient, condition) -> str: + """Map the current min M-gradient + condition to a tier (no hysteresis).""" + if min_gradient is None or min_gradient >= 79: + return "normal" + if min_gradient >= 0: + return "super_refraction" + # min_gradient < 0 -> ducting; strongest when it reaches the surface or is very steep + if condition == "surface_duct" or min_gradient < -100: + return "surface_duct" + return "duct" + + def _apply_hysteresis(self, raw_tier, min_gradient, last_tier) -> str: + """Commit a tier change only past a deadband, to avoid boundary flapping. + + The most-severe tier (surface/strong duct) is categorical (the duct + reaches the ground) and is never held back or held onto by the + gradient deadband -- it fires promptly and clears promptly. + """ + if last_tier is None or raw_tier == last_tier or min_gradient is None: + return raw_tier + if raw_tier == "surface_duct" or last_tier == "surface_duct": + return raw_tier + idx_raw = self._TIER_ORDER.index(raw_tier) + idx_last = self._TIER_ORDER.index(last_tier) + if idx_raw > idx_last: + # toward a more-enhanced tier (gradient dropped) + boundary = self._TIER_TOP_BOUNDARY[idx_last] + return raw_tier if min_gradient <= boundary - self.TIER_DEADBAND else last_tier + # toward a less-enhanced tier (gradient rose) + boundary = self._TIER_TOP_BOUNDARY[idx_last - 1] + return raw_tier if min_gradient >= boundary + self.TIER_DEADBAND else last_tier + + def _update_events(self): + """Derive the current propagation tier (with hysteresis) and stage events. + + Mirrors the SWPC pattern: get_events() returns the current notable tier + every tick with a STABLE per-tier event_id, so the store dedups a + sustained tier and only a tier change re-emits. A 'normal' tier stages + nothing. + """ + self._events = [] + if not self._status: + return + + min_gradient = self._status.get("min_gradient") + condition = self._status.get("condition") + + raw_tier = self._raw_tier(min_gradient, condition) + tier = self._apply_hysteresis(raw_tier, min_gradient, self._last_tier) + self._last_tier = tier + self._status["tier"] = tier + + if tier == "normal": + return # no enhancement -- nothing to emit + + now = time.time() + loc = f"{round(self._lat, 2)}_{round(self._lon, 2)}" + label = self._TIER_LABEL[tier] + assessment = self._status.get("assessment", "") + headline = f"{label} -- {assessment}".strip(" -") if assessment else label + + self._events.append({ + "source": "ducting", + "event_id": f"ducting_{self._TIER_CODE[tier]}_{loc}", # STABLE per tier+location + "event_type": label, + "tier": tier, + "severity": self._TIER_SEVERITY[tier], + "headline": headline, + "min_gradient": min_gradient, + "duct_base_m": self._status.get("duct_base_m"), + "duct_thickness_m": self._status.get("duct_thickness_m"), + "surface_duct": (tier == "surface_duct"), + "lat": self._lat, + "lon": self._lon, + "expires": now + 6 * 3600, # outlast the 3h poll gap so a sustained tier dedups + "fetched_at": now, + }) + + def get_events(self) -> list: + """Get current propagation-enhancement events (tier-based).""" + return self._events + + def to_event(self, evt: dict) -> Optional["Event"]: + """Translate a stored ducting tier event into a pipeline Event. + + Category from tier; tier-mapped severity passed through. Ducting is + geographic, so the Event carries the assessment location's lat/lon. + The stable per-tier event_id is the group_key and sole inhibit_key; + a tier escalation yields a new key and re-notifies. + + Args: + evt: Internal event dict from get_events() + + Returns: + Event instance, or None for a normal/missing tier, missing + location, or missing gradient. + """ + try: + tier = evt.get("tier") + if not tier or tier == "normal": + return None # no enhancement -- not actionable + + lat = evt.get("lat") + lon = evt.get("lon") + if lat is None or lon is None: + return None # missing location + + if evt.get("min_gradient") is None: + return None # missing the driving quantity + + event_id = evt.get("event_id") + if not event_id: + return None + + category = self._TIER_CATEGORY.get(tier) + if category is None: + return None # unknown tier + + severity = evt.get("severity", "routine") + title = evt.get("event_type") or "Tropospheric propagation enhancement" + + summary_parts = [title] + mg = evt.get("min_gradient") + if mg is not None: + summary_parts.append(f"min M-gradient {mg}/km") + base = evt.get("duct_base_m") + thick = evt.get("duct_thickness_m") + if base is not None and thick: + summary_parts.append(f"duct base {int(base)} m, ~{int(thick)} m thick") + if evt.get("surface_duct"): + summary_parts.append("surface duct") + summary = " | ".join(summary_parts)[:300] + + return make_event( + source="ducting", + category=category, + severity=severity, + title=title, + summary=summary, + timestamp=evt.get("fetched_at"), + expires=evt.get("expires"), + lat=lat, + lon=lon, + group_key=event_id, + inhibit_keys=[event_id], + ) + except Exception: + logger.exception(f"Ducting to_event failed for evt: {evt.get('event_id')}") + return None + def get_status(self) -> dict: """Get current ducting status.""" return self._status diff --git a/meshai/env/store.py b/meshai/env/store.py index ffcdb2a..9b6ad1d 100644 --- a/meshai/env/store.py +++ b/meshai/env/store.py @@ -101,6 +101,12 @@ class EnvironmentalStore: self._emit_event(adapter, evt) elif name == "ducting": self._ducting_status = adapter.get_status() + for evt in adapter.get_events(): + key = (evt["source"], evt["event_id"]) + is_new = key not in self._events + self._events[key] = evt + if is_new and self._event_bus and hasattr(adapter, "to_event"): + self._emit_event(adapter, evt) else: for evt in adapter.get_events(): key = (evt["source"], evt["event_id"]) diff --git a/tests/test_adapter_ducting.py b/tests/test_adapter_ducting.py new file mode 100644 index 0000000..ba32aa5 --- /dev/null +++ b/tests/test_adapter_ducting.py @@ -0,0 +1,235 @@ +"""Tests for ducting adapter Phase 2.13 — tier-based threshold-crossing emission.""" + +import time +from unittest.mock import MagicMock + +import pytest + +from meshai.env.ducting import DuctingAdapter +from meshai.notifications.events import Event + + +# ============================================================ +# FIXTURES / HELPERS +# ============================================================ + +def fresh_adapter(): + """A DuctingAdapter with the Twin Falls default location.""" + cfg = MagicMock() + cfg.latitude = 42.56 + cfg.longitude = -114.47 + cfg.tick_seconds = 10800 + return DuctingAdapter(cfg) + + +@pytest.fixture +def adapter(): + return fresh_adapter() + + +def set_status(adapter, min_gradient, condition="normal", assessment="", base=None, thick=None): + """Drive _update_events from a synthetic status blob (mirrors _parse_response).""" + adapter._status = { + "min_gradient": min_gradient, + "condition": condition, + "assessment": assessment, + "duct_base_m": base, + "duct_thickness_m": thick, + } + adapter._update_events() + return adapter.get_events() + + +def make_ducting_event( + tier="duct", + min_gradient=-30, + severity="priority", + lat=42.56, + lon=-114.47, + surface=False, + base=1500, + thick=300, +): + """A stored ducting tier event dict (mirrors _update_events output).""" + code = {"super_refraction": "superrefraction", "duct": "duct", "surface_duct": "surfaceduct"}[tier] + label = { + "super_refraction": "Super-refraction (enhanced range)", + "duct": "Tropospheric ducting (elevated)", + "surface_duct": "Tropospheric ducting (surface)", + }[tier] + now = time.time() + return { + "source": "ducting", + "event_id": f"ducting_{code}_{round(lat, 2)}_{round(lon, 2)}", + "event_type": label, + "tier": tier, + "severity": severity, + "headline": label, + "min_gradient": min_gradient, + "duct_base_m": base, + "duct_thickness_m": thick, + "surface_duct": surface, + "lat": lat, + "lon": lon, + "expires": now + 6 * 3600, + "fetched_at": now, + } + + +# ============================================================ +# TIER CLASSIFICATION TESTS +# ============================================================ + +def test_normal_emits_nothing(adapter): + """A normal atmosphere (gradient >= 79) stages no event.""" + evs = set_status(adapter, 118, "normal") + assert evs == [] + assert adapter._last_tier == "normal" + + +def test_super_refraction_tier(adapter): + """0 <= gradient < 79 is super-refraction (routine).""" + evs = set_status(adapter, 40, "super_refraction") + assert len(evs) == 1 + assert evs[0]["tier"] == "super_refraction" + assert evs[0]["severity"] == "routine" + + +def test_duct_tier(adapter): + """gradient < 0 (elevated) is ducting (priority).""" + evs = set_status(adapter, -30, "elevated_duct") + assert evs[0]["tier"] == "duct" + assert evs[0]["severity"] == "priority" + + +def test_surface_duct_tier(adapter): + """A surface duct condition is the strong/surface tier (immediate).""" + evs = set_status(adapter, -30, "surface_duct") + assert evs[0]["tier"] == "surface_duct" + assert evs[0]["severity"] == "immediate" + assert evs[0]["surface_duct"] is True + + +def test_steep_gradient_is_surface_duct(adapter): + """A very steep gradient (< -100) escalates to the surface/strong tier.""" + evs = set_status(adapter, -150, "elevated_duct") + assert evs[0]["tier"] == "surface_duct" + + +def test_update_events_severity_tiering(): + """Fresh-adapter severity per tier: routine / priority / immediate.""" + cases = [(40, "super_refraction", "routine"), (-30, "elevated_duct", "priority"), (-30, "surface_duct", "immediate")] + for g, cond, sev in cases: + a = fresh_adapter() + assert set_status(a, g, cond)[0]["severity"] == sev + + +# ============================================================ +# DEDUP / ESCALATION / DEADBAND (regression guards) +# ============================================================ + +def test_dedup_id_stable_across_ticks(adapter): + """REGRESSION: a sustained tier keeps the SAME event_id across ticks.""" + e1 = set_status(adapter, -30, "elevated_duct")[0]["event_id"] + time.sleep(0.01) + e2 = set_status(adapter, -28, "elevated_duct")[0]["event_id"] + assert e1 == e2 + assert e1 == "ducting_duct_42.56_-114.47" + + +def test_tier_escalation_new_event_id(adapter): + """An escalation to a stronger tier yields a new (stable) event_id.""" + e_sr = set_status(adapter, 40, "super_refraction")[0]["event_id"] + e_duct = set_status(adapter, -30, "elevated_duct")[0]["event_id"] + assert e_sr == "ducting_superrefraction_42.56_-114.47" + assert e_duct == "ducting_duct_42.56_-114.47" + assert e_sr != e_duct + + +def test_deadband_holds_tier_on_zero_boundary_wiggle(adapter): + """A tiny dip just below 0 (within the 5 M-unit deadband) holds the prior tier.""" + set_status(adapter, 40, "super_refraction") + assert adapter._last_tier == "super_refraction" + evs = set_status(adapter, -2, "elevated_duct") # within deadband of 0 + assert adapter._last_tier == "super_refraction" + assert evs[0]["tier"] == "super_refraction" + evs2 = set_status(adapter, -8, "elevated_duct") # clearly past deadband + assert adapter._last_tier == "duct" + assert evs2[0]["tier"] == "duct" + + +def test_deadband_holds_normal_on_79_boundary_wiggle(adapter): + """A dip just below 79 (within the deadband) holds normal (no event).""" + set_status(adapter, 100, "normal") + assert adapter._last_tier == "normal" + evs = set_status(adapter, 76, "super_refraction") # within deadband of 79 + assert adapter._last_tier == "normal" + assert evs == [] + evs2 = set_status(adapter, 70, "super_refraction") # past deadband + assert evs2[0]["tier"] == "super_refraction" + + +def test_surface_duct_not_held_by_deadband(adapter): + """The categorical surface duct tier fires promptly regardless of deadband.""" + set_status(adapter, 40, "super_refraction") + evs = set_status(adapter, -30, "surface_duct") + assert evs[0]["tier"] == "surface_duct" + + +# ============================================================ +# to_event() — CATEGORY / SEVERITY / KEYS / FIELDS +# ============================================================ + +def test_tier_categories(adapter): + """super-refraction -> anomalous prop; (surface_)duct -> ducting enhancement.""" + assert adapter.to_event(make_ducting_event(tier="super_refraction", severity="routine")).category == "rf_anomalous_propagation" + assert adapter.to_event(make_ducting_event(tier="duct")).category == "rf_ducting_enhancement" + assert adapter.to_event(make_ducting_event(tier="surface_duct", severity="immediate", surface=True)).category == "rf_ducting_enhancement" + + +def test_severity_passes_through(adapter): + for sev in ["routine", "priority", "immediate"]: + assert adapter.to_event(make_ducting_event(severity=sev)).severity == sev + + +def test_group_key_and_inhibit_keys(adapter): + event = adapter.to_event(make_ducting_event(tier="duct")) + assert event.group_key == "ducting_duct_42.56_-114.47" + assert event.inhibit_keys == [event.group_key] + + +def test_populates_core_fields(adapter): + evt = make_ducting_event(tier="duct", lat=42.56, lon=-114.47) + event = adapter.to_event(evt) + assert event.source == "ducting" + assert event.lat == 42.56 + assert event.lon == -114.47 + assert event.expires == evt["expires"] + assert event.timestamp == evt["fetched_at"] + assert event.id # auto-computed + + +# ============================================================ +# DEFENSIVE / NON-EMIT TESTS +# ============================================================ + +def test_normal_tier_returns_none(adapter): + evt = make_ducting_event() + evt["tier"] = "normal" + assert adapter.to_event(evt) is None + + +def test_missing_location_returns_none(adapter): + evt = make_ducting_event() + evt["lat"] = None + assert adapter.to_event(evt) is None + + +def test_missing_gradient_returns_none(adapter): + evt = make_ducting_event() + evt["min_gradient"] = None + assert adapter.to_event(evt) is None + + +def test_does_not_raise_on_corrupted_dict(adapter): + assert adapter.to_event({"garbage": True}) is None