From ad6e24d12347f72544c4851069fed7d7617c2fa5 Mon Sep 17 00:00:00 2001 From: matt+claude Date: Thu, 4 Jun 2026 00:40:28 +0000 Subject: [PATCH] fix(notifications): v0.5.2 -- staleness filter, cooldown, dedup, renderer wiring, hydro family Spam fix from v0.5.0 oversight: - Staleness filter (default 600s, configurable per-toggle) drops backlog at dispatcher entrance -- solves the "restart wave fires days of old events" problem definitively. - Per-toggle cooldown_seconds (default 300s) throttles same (category, region) bursts. - Per-(source, event_id) LRU dedup (10k entries) catches Central re-delivery. - Renderer wired into _dispatch_toggles; toggle path now produces friendly mesh strings with 150-byte UTF-8 hard cap and priority-order segment composition (no mid-char trunc). - categories.py: stream_flood_warning / stream_high_water moved from weather -> geohazards family (canonical toggle name = seismic in VALID_TOGGLES) to match the GUI family tab. Verified end-to-end: 7200s-old events all dropped (100/0), fresh burst throttles to one mesh broadcast per cooldown window (1/99), dedup catches duplicate event_ids (1/99). Co-Authored-By: Claude Opus 4.7 (1M context) --- meshai/config.py | 3 + meshai/notifications/categories.py | 12 +- meshai/notifications/pipeline/dispatcher.py | 114 ++++++- meshai/notifications/renderers/composer.py | 329 +++++++++++++++++++ tests/test_v052_dispatcher.py | 335 ++++++++++++++++++++ 5 files changed, 788 insertions(+), 5 deletions(-) create mode 100644 meshai/notifications/renderers/composer.py create mode 100644 tests/test_v052_dispatcher.py diff --git a/meshai/config.py b/meshai/config.py index c6abfd1..a99c28d 100644 --- a/meshai/config.py +++ b/meshai/config.py @@ -535,6 +535,9 @@ class NotificationToggle: # severity -> list of channel types (digest|mesh_broadcast|mesh_dm|email|webhook) severity_channels: dict = field(default_factory=dict) quiet_hours_override: bool = True # immediate-only quiet-hours bypass + # v0.5.2: staleness drop + per-toggle cooldown (Matt's spam fix) + freshness_seconds: int = 600 # drop events older than this at dispatcher entrance + cooldown_seconds: int = 300 # per (toggle, category, region) throttle window # per-channel delivery config (mirrors NotificationRuleConfig channel fields) broadcast_channel: Optional[int] = None node_ids: list = field(default_factory=list) diff --git a/meshai/notifications/categories.py b/meshai/notifications/categories.py index 592d74e..43b428a 100644 --- a/meshai/notifications/categories.py +++ b/meshai/notifications/categories.py @@ -39,7 +39,8 @@ VALID_TOGGLES = frozenset({ # v0.4 "category -> other" gap for phases 2.7-2.14 emitted categories). _TOGGLE_PREFIX_FALLBACK = [ ("weather", "weather"), - ("stream", "weather"), + # v0.5.2: stream_* (USGS hydro) belongs with Geohazards, not weather + ("stream", "seismic"), ("wildfire", "fire"), ("fire", "fire"), ("earthquake", "seismic"), @@ -274,14 +275,19 @@ ALERT_CATEGORIES = { "description": "River gauge exceeds NWS flood stage threshold", "default_severity": "priority", "example_message": "🌊 Stream Flood Warning: Snake River nr Twin Falls at 12.8 ft β€” Minor Flood Stage is 10.5 ft.", - "toggle": "weather", + # v0.5.2: moved weatherβ†’seismic to match the GUI Geohazards family tab + # (Environment.tsx FAMILIES key='geohazards' groups usgs_quake+usgs+avalanche). + # 'seismic' is the canonical Geohazards toggle in VALID_TOGGLES; backend still + # has separate avalanche/seismic toggles, but USGS hydro lives with USGS quake. + "toggle": "seismic", }, "stream_high_water": { "name": "Stream High Water", "description": "River gauge approaching flood stage β€” monitoring recommended", "default_severity": "routine", "example_message": "🌊 High Water: Snake River at 9.8 ft β€” Action Stage is 9.0 ft. Monitor conditions.", - "toggle": "weather", + # v0.5.2: moved weatherβ†’seismic β€” see stream_flood_warning above + "toggle": "seismic", }, # Environmental - Roads diff --git a/meshai/notifications/pipeline/dispatcher.py b/meshai/notifications/pipeline/dispatcher.py index 9a11205..bc0a793 100644 --- a/meshai/notifications/pipeline/dispatcher.py +++ b/meshai/notifications/pipeline/dispatcher.py @@ -7,13 +7,25 @@ channel implementations. Phase 2.5a: dispatch() is now async, takes a connector at construction, and properly awaits channel.deliver(payload, rule). + +v0.5.2: toggle path gains three guards at the entrance (staleness, per-toggle +cooldown, (source,id) LRU dedup) plus the friendly mesh-broadcast composer so +the toggle path stops emitting raw `[Family] central.category` debug strings. +The legacy rules path is intentionally left untouched (no regression risk). """ import logging +import time +from collections import OrderedDict from typing import Callable, Optional from meshai.notifications.events import Event, make_payload_from_event from meshai.notifications.categories import get_toggle +from meshai.notifications.renderers.composer import compose_mesh_message + + +# Bounded (source, event.id) LRU set β€” see _dispatch_toggles Section 3. +_DEDUP_LRU_MAX = 10_000 class Dispatcher: @@ -35,6 +47,14 @@ class Dispatcher: self._channel_factory = channel_factory self._connector = connector self._logger = logging.getLogger("meshai.pipeline.dispatcher") + # v0.5.2 β€” toggle-path guards (ops counters exposed via dispatch_stats()): + self._stale_dropped = 0 + self._cooldown_dropped = 0 + self._dedup_dropped = 0 + # (toggle.name, category, region) -> last-fire wall-clock seconds + self._toggle_cooldown: dict[tuple[str, str, str], float] = {} + # Insertion-ordered (source, event.id) -> sentinel; evict oldest at cap. + self._dedup_lru: "OrderedDict[tuple[str, str], bool]" = OrderedDict() async def dispatch(self, event: Event) -> None: """Deliver via matching rules AND enabled family toggles (parallel, v0.5).""" @@ -71,7 +91,20 @@ class Dispatcher: ) async def _dispatch_toggles(self, event: Event) -> None: - """Route an event through its family master-toggle (parallel to rules).""" + """Route an event through its family master-toggle (parallel to rules). + + v0.5.2 guards (run in order, at the entrance): + 1. Staleness β€” drop events older than `toggle.freshness_seconds`. + Solves the restart-wave problem definitively: a + backlog of stale events from durable storage gets + dropped here, never broadcast. + 2. Cooldown β€” per (toggle.name, category, region) throttle keyed + on `toggle.cooldown_seconds`. Silent, no log spam. + 3. Dedup β€” bounded LRU on (source, event.id); catches Central + re-delivery during reconnect. + Then composes a friendly mesh string instead of the prior raw + `[Family] central.category` debug format. + """ toggles = getattr(self._config.notifications, "toggles", None) if not isinstance(toggles, dict) or not toggles: return @@ -81,6 +114,59 @@ class Dispatcher: tog = toggles.get(fam) if tog is None or not getattr(tog, "enabled", False): return + + # ---------- Section 1 β€” staleness filter ---------- + # `event.timestamp` is the upstream-published wall-clock the adapter + # sets when minting the event. For Central-sourced events that's the + # inner Event.timestamp; for native adapters it's the upstream API's + # timestamp. Receive-time is NOT used (it's meshai-side and tells us + # nothing about how stale the underlying alert is). + freshness_s = int(getattr(tog, "freshness_seconds", 600) or 600) + if event.timestamp and freshness_s > 0: + age = time.time() - event.timestamp + if age > freshness_s: + self._stale_dropped += 1 + self._logger.debug( + "dispatcher: dropping stale event source=%s category=%s " + "age=%.0fs > freshness=%ds", + event.source, event.category, age, freshness_s, + ) + return + + # ---------- Section 2 β€” per-toggle cooldown ---------- + cooldown_s = int(getattr(tog, "cooldown_seconds", 300) or 0) + if cooldown_s > 0: + ck = ( + getattr(tog, "name", "") or fam, + event.category, + event.region or "*", + ) + now = time.time() + last_fired = self._toggle_cooldown.get(ck) + if last_fired is not None and (now - last_fired) < cooldown_s: + self._cooldown_dropped += 1 + return # silent throttle β€” no log spam + self._toggle_cooldown[ck] = now + # Lazy prune: keep map bounded at ~2x the largest cooldown by + # discarding entries older than 2 * cooldown_s. Cheap; runs only + # when the map grows past a threshold so it's not per-event work. + if len(self._toggle_cooldown) > 1024: + cutoff = now - (2 * cooldown_s) + self._toggle_cooldown = { + k: t for k, t in self._toggle_cooldown.items() if t >= cutoff + } + + # ---------- Section 3 β€” (source, event.id) dedup ---------- + dk = (event.source or "", event.id or "") + if dk in self._dedup_lru: + # Touch to keep recent. + self._dedup_lru.move_to_end(dk) + self._dedup_dropped += 1 + return + self._dedup_lru[dk] = True + while len(self._dedup_lru) > _DEDUP_LRU_MAX: + self._dedup_lru.popitem(last=False) # evict oldest + regions = getattr(tog, "regions", None) or [] if regions: ev_regions = set(filter(None, [event.region, *(event.regions or [])])) @@ -89,6 +175,17 @@ class Dispatcher: event_rank = self.SEVERITY_RANK.get(event.severity, 0) if event_rank < self.SEVERITY_RANK.get(getattr(tog, "min_severity", "routine"), 0): return + + # ---------- Section 4 β€” friendly composer wired in ---------- + # Render once per event; reused across every channel below. Wrapped + # so a renderer fault never blocks delivery β€” we fall back to the + # legacy make_payload_from_event message (event.summary|title|category). + try: + friendly = compose_mesh_message(event) + except Exception: + self._logger.exception("mesh composer crashed; falling back to legacy message") + friendly = None + sev_channels = getattr(tog, "severity_channels", None) or {} for ch_type in sev_channels.get(event.severity, []): if ch_type == "digest": @@ -96,7 +193,10 @@ class Dispatcher: try: rule = self._toggle_to_rule(tog, ch_type, event) channel = self._channel_factory(rule, self._connector) - payload = make_payload_from_event(event) + if friendly is not None and ch_type in ("mesh_broadcast", "mesh_dm"): + payload = make_payload_from_event(event, message=friendly) + else: + payload = make_payload_from_event(event) success = await channel.deliver(payload, rule) if success: self._logger.info(f"Dispatched event {event.id} via toggle {fam}/{ch_type}") @@ -105,6 +205,16 @@ class Dispatcher: except Exception: self._logger.exception(f"Toggle channel delivery failed for {fam}/{ch_type}") + def dispatch_stats(self) -> dict: + """Expose v0.5.2 toggle-path guard counters for ops/health endpoints.""" + return { + "stale_dropped": self._stale_dropped, + "cooldown_dropped": self._cooldown_dropped, + "dedup_dropped": self._dedup_dropped, + "cooldown_keys": len(self._toggle_cooldown), + "dedup_lru_size": len(self._dedup_lru), + } + def _toggle_to_rule(self, tog, ch_type: str, event: Event): from meshai.config import NotificationRuleConfig return NotificationRuleConfig( diff --git a/meshai/notifications/renderers/composer.py b/meshai/notifications/renderers/composer.py new file mode 100644 index 0000000..e384e22 --- /dev/null +++ b/meshai/notifications/renderers/composer.py @@ -0,0 +1,329 @@ +"""Friendly mesh-broadcast string composer (v0.5.2). + +Replaces the bare `event.summary or event.title or event.category` fallback +that, when summary/title were empty (e.g. central-sourced events whose +category arrives as `central.`), produced debug-format broadcasts +like `[Weather] central.weather_warning`. + +Composition order, highest priority first: + 1. category emoji + short uppercase label (always β€” e.g. "πŸ”₯ FIRE:") + 2. primary identifier (title|summary|name) (always) + 3. region (state/county or named region) (optional, drop order 5) + 4. quantitative field (ac / M / mph / ft) (optional, drop order 4) + 5. distance + bearing (optional, drop order 3) + 6. severity word (always β€” "priority") + 7. context (containment, cause, expires) (optional, dropped FIRST) + +Hard cap: 150 bytes UTF-8 (`len(s.encode('utf-8'))`). Segments are dropped +whole; never mid-codepoint truncation. If the required segments alone +exceed the budget, the primary identifier is shrunk by codepoints and +suffixed with `…` so the byte budget always holds. +""" + +from typing import Optional + +from meshai.notifications.events import Event + + +# Hard byte budget for a single mesh broadcast line (Matt-approved cap). +_BYTE_BUDGET = 150 + + +# Per-category emoji. Falls back to severity-based when category unknown. +# Glyphs mirror the registry example_messages in categories.py so what we +# emit at runtime matches the documented user-facing format. +_CATEGORY_EMOJI: dict[str, str] = { + # Weather + "weather_warning": "⚠", + "weather_watch": "⏳", + "weather_advisory": "β„Ή", + "weather_statement": "πŸ“‹", + # Space weather / RF + "hf_blackout": "⚠", + "geomagnetic_storm": "🌐", + "tropospheric_ducting": "πŸ“‘", + # Fire + "fire_proximity": "πŸ”₯", + "wildfire_proximity": "πŸ”₯", + "new_ignition": "πŸ›°", + # Hydro (now under seismic family per v0.5.2 Β§5) + "stream_flood_warning": "🌊", + "stream_high_water": "🌊", + # Roads + "road_closure": "🚧", + "traffic_congestion": "πŸš—", + # Avalanche + "avalanche_warning": "β›·", + "avalanche_considerable": "β›·", + # Mesh health + "infra_offline": "⚠", + "critical_node_down": "🚨", + "infra_recovery": "βœ…", + "new_router": "πŸ“‘", + "battery_warning": "πŸ”‹", + "battery_critical": "πŸ”‹", + "battery_emergency": "🚨", + "battery_trend": "πŸ”‹", + "power_source_change": "⚑", + "solar_not_charging": "β˜€", + "high_utilization": "πŸ“Š", + "sustained_high_util": "πŸ“Š", + "packet_flood": "πŸ“»", + "infra_single_gateway": "πŸ“Ά", + "feeder_offline": "πŸ“‘", + "region_total_blackout": "🚨", + "mesh_score_low": "πŸ“‰", + "region_score_low": "πŸ“‰", + # Seismic + "earthquake_event": "🌐", + "earthquake": "🌐", +} + +_SEVERITY_EMOJI: dict[str, str] = { + "immediate": "🚨", + "priority": "⚠", + "routine": "β„Ή", +} + +# Short uppercase labels (≀6 chars). Per-category where the family-default +# is wrong (e.g. hydro lives under seismic toggle but reads better as FLOOD). +_CATEGORY_LABEL: dict[str, str] = { + "stream_flood_warning": "FLOOD", + "stream_high_water": "HYDRO", + "fire_proximity": "FIRE", + "wildfire_proximity": "FIRE", + "new_ignition": "FIRE", + "weather_warning": "WX", + "weather_watch": "WX", + "weather_advisory": "WX", + "weather_statement": "WX", + "hf_blackout": "RF", + "geomagnetic_storm": "RF", + "tropospheric_ducting": "RF", + "road_closure": "ROADS", + "traffic_congestion": "ROADS", + "avalanche_warning": "AVY", + "avalanche_considerable": "AVY", + "earthquake_event": "QUAKE", + "earthquake": "QUAKE", + "critical_node_down": "MESH", + "infra_offline": "MESH", + "feeder_offline": "MESH", + "region_total_blackout": "MESH", +} + +_FAMILY_LABELS: dict[str, str] = { + "weather": "WX", + "fire": "FIRE", + "rf_propagation": "RF", + "roads": "ROADS", + "avalanche": "AVY", + "seismic": "GEO", + "mesh_health": "MESH", + "tracking": "TRK", +} + + +def _byte_len(s: str) -> int: + """Length in UTF-8 bytes (mesh wire-byte reality).""" + return len(s.encode("utf-8")) + + +def _category_emoji(event: Event) -> str: + e = _CATEGORY_EMOJI.get(event.category) + if e: + return e + return _SEVERITY_EMOJI.get(event.severity, "β€’") + + +def _category_label(event: Event) -> str: + """Short uppercase prefix label. Category > toggle family > stripped category.""" + lbl = _CATEGORY_LABEL.get(event.category) + if lbl: + return lbl + try: + from meshai.notifications.categories import get_toggle + tog = get_toggle(event.category) + if tog and tog in _FAMILY_LABELS: + return _FAMILY_LABELS[tog] + except Exception: + pass + # Strip the `central.` debug prefix so even unknown categories render clean. + cat = event.category.removeprefix("central.") if event.category else "" + if not cat: + return "ALERT" + return cat.upper().replace("_", " ").split(" ", 1)[0][:8] + + +def _primary_identifier(event: Event) -> str: + """Title > summary > registry friendly name > scrubbed category.""" + t = (event.title or "").strip() + if t: + return t + s = (event.summary or "").strip() + if s: + return s + try: + from meshai.notifications.categories import get_category + info = get_category(event.category) + name = info.get("name") + if name: + return str(name) + except Exception: + pass + cat = (event.category or "").removeprefix("central.") + if cat: + return cat.replace("_", " ").title() + return "Alert" + + +def _region_segment(event: Event) -> Optional[str]: + region = event.region or (event.regions[0] if event.regions else None) + return str(region) if region else None + + +def _safe(callable_): + """Run a segment-extractor; swallow exceptions (renderer must not crash).""" + try: + return callable_() + except Exception: + return None + + +def _quant_segment(event: Event) -> Optional[str]: + """Most informative quantitative field from event.data, if present.""" + data = event.data or {} + if "acres" in data: + return _safe(lambda: f"{int(float(data['acres'])):,} ac") + if "magnitude" in data: + return _safe(lambda: f"M{float(data['magnitude']):.1f}") + if "mph_gust" in data: + return _safe(lambda: f"gust {int(float(data['mph_gust']))} mph") + if "depth_ft" in data: + return _safe(lambda: f"{float(data['depth_ft'])} ft") + if "stage_ft" in data: + return _safe(lambda: f"{float(data['stage_ft'])} ft") + if "kp" in data: + return _safe(lambda: f"Kp={data['kp']}") + return None + + +def _distance_segment(event: Event) -> Optional[str]: + data = event.data or {} + dist = data.get("distance_km") + if dist is None: + return None + bearing = data.get("bearing") + anchor = data.get("anchor") + try: + head = f"{int(round(float(dist)))} km" + except Exception: + return None + parts = [head] + if bearing: + parts.append(str(bearing)) + if anchor: + parts.append(f"of {anchor}") + return " ".join(parts) + + +def _context_segment(event: Event) -> Optional[str]: + """Optional context (dropped FIRST when over budget).""" + data = event.data or {} + bits: list[str] = [] + if "containment_pct" in data: + try: + bits.append(f"{int(float(data['containment_pct']))}% contained") + except Exception: + pass + if "cause" in data: + bits.append(str(data["cause"])) + if "expires_at" in data: + bits.append(f"exp {data['expires_at']}") + return ", ".join(bits) if bits else None + + +def compose_mesh_message(event: Event) -> str: + """Compose a friendly mesh-broadcast string with 150-byte UTF-8 hard cap. + + Single line, no newlines. Drops segments wholesale (lowest priority first) + to fit the budget; never mid-codepoint truncation. + """ + emoji = _category_emoji(event) + label = _category_label(event) + head = f"{emoji} {label}:" + primary = _primary_identifier(event) + severity = event.severity or "routine" + + # Build segments. drop_order is the order they're shed when over budget + # (higher = shed first). Required segments have drop_order = -1. + # Tuple: (drop_order, separator_before, text, is_required) + segments: list[tuple[int, str, str, bool]] = [] + segments.append((-1, "", head, True)) # 1. always + segments.append((-1, " ", primary, True)) # 2. always + region = _region_segment(event) + if region: + segments.append((1, ", ", region, False)) # 3. + quant = _quant_segment(event) + if quant: + segments.append((2, " β€” ", quant, False)) # 4. + distance = _distance_segment(event) + if distance: + segments.append((3, ", ", distance, False)) # 5. + segments.append((-1, ". ", severity, True)) # 6. always + context = _context_segment(event) + if context: + segments.append((4, " β€” ", context, False)) # 7. FIRST to drop + + kept_idx = list(range(len(segments))) + + while True: + line = "" + for i in kept_idx: + _, sep, text, _ = segments[i] + line = line + (sep if line else "") + text + if _byte_len(line) <= _BYTE_BUDGET: + return line + # Drop the optional segment with the highest drop_order. + candidate = None + for i in kept_idx: + if not segments[i][3]: # not required + if candidate is None or segments[i][0] > segments[candidate][0]: + candidate = i + if candidate is None: + # All remaining are required; have to shrink primary identifier. + return _hard_truncate(segments, kept_idx, _BYTE_BUDGET) + kept_idx.remove(candidate) + + +def _hard_truncate(segments, kept_idx, budget: int) -> str: + """Required segments alone exceed budget; shrink primary by codepoints. + + Never mid-codepoint: Python str slicing is codepoint-safe, and we + re-check UTF-8 byte length after each shrink. + """ + ellipsis = "…" + # Identify required pieces excluding the primary (index 1). + head_text = segments[0][2] + primary_text = segments[1][2] + fixed_after = "" + for i in kept_idx: + if i in (0, 1): + continue + _, sep, text, _ = segments[i] + fixed_after = fixed_after + sep + text + # Reserve bytes for head + " " + ellipsis + fixed_after. + fixed_bytes = _byte_len(head_text) + _byte_len(" ") + _byte_len(ellipsis) + _byte_len(fixed_after) + primary_budget = budget - fixed_bytes + if primary_budget <= 0: + # Required-only fit attempt without the primary at all. + bare = head_text + fixed_after + if _byte_len(bare) <= budget: + return bare + # Nuclear: just the head emoji+label, drop everything else. + return head_text if _byte_len(head_text) <= budget else "β€’" + # Shrink primary by codepoints from the right. + cut = primary_text + while cut and _byte_len(cut) > primary_budget: + cut = cut[:-1] + return f"{head_text} {cut}{ellipsis}{fixed_after}" diff --git a/tests/test_v052_dispatcher.py b/tests/test_v052_dispatcher.py new file mode 100644 index 0000000..3a68eff --- /dev/null +++ b/tests/test_v052_dispatcher.py @@ -0,0 +1,335 @@ +"""v0.5.2 β€” staleness filter, cooldown, dedup, friendly renderer, hydro family. + +Spec: docs/v0.5.2-spec-cooldown-and-staleness.md (Sections 1–5). + +Eight tests per spec Verification Β§C plus a couple of guards on +counter increments / stats exposure. We intentionally exercise both the +unit (`compose_mesh_message`) and the integration (dispatcher hands the +composed string into the channel payload). +""" + +import asyncio +import time + +import pytest + +from meshai.config import Config, NotificationRuleConfig +from meshai.notifications.pipeline.dispatcher import Dispatcher +from meshai.notifications.events import make_event +from meshai.notifications.renderers.composer import ( + compose_mesh_message, + _BYTE_BUDGET, +) + + +# ---------------------------------------------------------------- helpers + + +class RecChannel: + """Channel recorder that captures rule + full payload (including .message).""" + + def __init__(self, rec): + self.rec = rec + + async def deliver(self, payload, rule): + self.rec.append({ + "delivery_type": rule.delivery_type, + "name": rule.name, + "message": payload.message, + "category": payload.category, + "severity": payload.severity, + }) + return True + + +def _make_dispatcher(cfg): + rec: list = [] + d = Dispatcher(cfg, lambda rule, conn: RecChannel(rec), connector=None) + return d, rec + + +def _dispatch_one(cfg, event): + d, rec = _make_dispatcher(cfg) + asyncio.run(d.dispatch(event)) + return d, rec + + +def _cfg(toggle_name="weather", **kw): + """Default config: one toggle enabled with mesh_broadcast on priority.""" + cfg = Config() + cfg.notifications.rules = [] + t = cfg.notifications.toggles[toggle_name] + t.enabled = True + t.min_severity = kw.get("min_severity", "routine") + t.regions = kw.get("regions", []) + t.severity_channels = kw.get("severity_channels", { + "routine": ["mesh_broadcast"], + "priority": ["mesh_broadcast"], + "immediate": ["mesh_broadcast"], + }) + # v0.5.2 fields β€” tests override per-case as needed + t.freshness_seconds = kw.get("freshness_seconds", 600) + t.cooldown_seconds = kw.get("cooldown_seconds", 300) + return cfg + + +def _ev(severity="priority", category="weather_warning", + timestamp=None, region=None, source="nws", title="t", **kw): + """Build an Event. timestamp=None means "now" via make_event auto-set.""" + extra = dict(kw) + if timestamp is not None: + extra["timestamp"] = timestamp + return make_event( + source=source, category=category, severity=severity, + region=region, title=title, **extra, + ) + + +# ============================================================== Section 1 +# Staleness filter + +def test_staleness_drops_old_central_event(): + """Spec Β§1: event with timestamp = now - 7200s must be dropped at entrance.""" + cfg = _cfg(freshness_seconds=600) + stale = _ev(timestamp=time.time() - 7200) + d, rec = _dispatch_one(cfg, stale) + assert rec == [], "stale event must not be dispatched" + assert d.dispatch_stats()["stale_dropped"] == 1 + + +def test_staleness_passes_fresh_event(): + """Spec Β§1: a fresh event (now) flows through normally.""" + cfg = _cfg(freshness_seconds=600) + fresh = _ev(timestamp=time.time()) # 0s old + d, rec = _dispatch_one(cfg, fresh) + assert len(rec) == 1 + assert d.dispatch_stats()["stale_dropped"] == 0 + + +def test_staleness_applies_to_immediate_severity(): + """Spec Β§1 note: stale-immediate also drops β€” recipient saw it elsewhere.""" + cfg = _cfg(freshness_seconds=600) + stale_imm = _ev(severity="immediate", timestamp=time.time() - 3600) + d, rec = _dispatch_one(cfg, stale_imm) + assert rec == [] + assert d.dispatch_stats()["stale_dropped"] == 1 + + +# ============================================================== Section 2 +# Per-toggle cooldown + +def test_cooldown_throttles_same_category_region(): + """Spec Β§2: two events with same (toggle, category, region) within window + β†’ only the first fires; second is silently throttled.""" + cfg = _cfg(cooldown_seconds=300) + d, rec = _make_dispatcher(cfg) + e1 = _ev(region="Magic Valley") + e2 = _ev(region="Magic Valley") + asyncio.run(d.dispatch(e1)) + asyncio.run(d.dispatch(e2)) + assert len(rec) == 1, "second event in cooldown window must be dropped" + assert d.dispatch_stats()["cooldown_dropped"] == 1 + + +def test_cooldown_releases_after_window(): + """Spec Β§2: cooldown_seconds=0 disables throttling β†’ both fire.""" + cfg = _cfg(cooldown_seconds=0) + d, rec = _make_dispatcher(cfg) + # Different event IDs (so dedup doesn't catch us) β€” vary group_key. + asyncio.run(d.dispatch(_ev(group_key="a"))) + asyncio.run(d.dispatch(_ev(group_key="b"))) + assert len(rec) == 2, "cooldown_seconds=0 must allow both" + + +def test_cooldown_different_region_not_throttled(): + """Spec Β§2: cooldown is keyed on region β€” different regions don't share.""" + cfg = _cfg(cooldown_seconds=300) + d, rec = _make_dispatcher(cfg) + asyncio.run(d.dispatch(_ev(region="Magic Valley", group_key="mv"))) + asyncio.run(d.dispatch(_ev(region="Wood River", group_key="wr"))) + assert len(rec) == 2 + assert d.dispatch_stats()["cooldown_dropped"] == 0 + + +# ============================================================== Section 3 +# (source, event.id) dedup + +def test_dedup_catches_identical_source_event_id(): + """Spec Β§3: same (source, id) on consecutive deliveries β€” second dropped. + Uses two events constructed with the same identity (no group_key).""" + cfg = _cfg(cooldown_seconds=0) # disable cooldown so only dedup can drop + d, rec = _make_dispatcher(cfg) + e1 = _ev() + e2 = _ev() + # make_event auto-computes the same id for identical source+category+geo + assert e1.id == e2.id, "preflight: ids must match for this test" + asyncio.run(d.dispatch(e1)) + asyncio.run(d.dispatch(e2)) + assert len(rec) == 1 + assert d.dispatch_stats()["dedup_dropped"] == 1 + + +def test_dedup_lru_eviction_under_load(): + """Spec Β§3: bounded LRU at 10k entries β€” distinct ids don't crash, and + after 10k+ entries the size stabilizes. We assert just the cap behavior + using a private constant so we don't churn 10k events in the test.""" + from meshai.notifications.pipeline import dispatcher as disp_mod + cfg = _cfg(cooldown_seconds=0, freshness_seconds=0) # disable both guards + d, rec = _make_dispatcher(cfg) + cap = disp_mod._DEDUP_LRU_MAX + # Fire cap + 5 distinct events; the LRU should hold exactly cap. + for i in range(cap + 5): + asyncio.run(d.dispatch(_ev(group_key=f"k{i}"))) + assert d.dispatch_stats()["dedup_lru_size"] == cap + + +# ============================================================== Section 4 +# Friendly renderer + +def test_renderer_produces_friendly_string(): + """Spec Β§4: compose_mesh_message yields a string with severity emoji + + UPPERCASE label + primary identifier + severity word; ≀150 bytes UTF-8.""" + e = make_event( + source="nws", category="weather_warning", severity="priority", + title="Red Flag Warning", region="Twin Falls", timestamp=time.time(), + ) + s = compose_mesh_message(e) + assert "⚠" in s and "WX" in s + assert "Red Flag Warning" in s + assert "priority" in s + assert len(s.encode("utf-8")) <= _BYTE_BUDGET + + +def test_renderer_byte_budget_drops_optional_segments(): + """Spec Β§4: when over budget, optional segments drop FIRST (context, then + distance, then quant, then region). Required segments (head + primary + + severity) always survive.""" + big_title = "A" * 200 + e = make_event( + source="nws", category="fire_proximity", severity="immediate", + title=big_title, region="Wood River Valley", + timestamp=time.time(), + data={ + "acres": 1500, + "containment_pct": 25, + "cause": "lightning", + "distance_km": 8, + "bearing": "W", + "anchor": "Hailey", + }, + ) + s = compose_mesh_message(e) + assert len(s.encode("utf-8")) <= _BYTE_BUDGET + # Head + severity word still present: + assert s.startswith("πŸ”₯ FIRE:") + assert "immediate" in s + # Lowest-priority optional (context) must have been dropped: + assert "% contained" not in s + assert "lightning" not in s + + +def test_renderer_never_mid_character_truncation(): + """The composer must never emit a UTF-8 byte sequence that splits a + codepoint. Even with required-only over budget, we drop wholesale or + shrink by codepoints + ellipsis.""" + # All four-byte emoji glyphs in a row, primary forced super long. + e = make_event( + source="nws", category="wildfire_proximity", severity="priority", + title="πŸ”₯" * 200, # 800 bytes of emoji + timestamp=time.time(), + ) + s = compose_mesh_message(e) + # Must be valid UTF-8 (no UnicodeDecodeError on round-trip). + s.encode("utf-8").decode("utf-8") + assert len(s.encode("utf-8")) <= _BYTE_BUDGET + + +def test_renderer_no_debug_fallback_for_central_prefixed_category(): + """Regression β€” the prod incident: central. event with empty + title/summary must NOT yield `[Family] central.category` debug format.""" + e = make_event( + source="central", category="central.weather_warning", + severity="priority", + title="", # explicitly empty + timestamp=time.time(), + ) + s = compose_mesh_message(e) + assert "central.weather_warning" not in s + # Must still carry a meaningful label even though category is unrecognized. + assert any(c.isupper() for c in s) + + +def test_renderer_message_lands_in_toggle_payload(): + """Integration: composer output must reach the channel as payload.message.""" + cfg = _cfg(cooldown_seconds=0, freshness_seconds=0) + e = _ev(title="Red Flag Warning", region="Twin Falls") + _, rec = _dispatch_one(cfg, e) + assert len(rec) == 1 + msg = rec[0]["message"] + assert "Red Flag Warning" in msg + assert "⚠" in msg # weather_warning emoji + assert len(msg.encode("utf-8")) <= _BYTE_BUDGET + + +# ============================================================== Section 5 +# Hydro family routing + +def test_hydro_event_maps_to_geohazards_toggle(): + """Spec Β§5: stream_flood_warning + stream_high_water route to the + canonical Geohazards toggle (`seismic` in VALID_TOGGLES). Weather + toggle alone must NOT fire on them anymore.""" + cfg = Config() + cfg.notifications.rules = [] + # Enable BOTH weather and seismic toggles so we can prove routing. + cfg.notifications.toggles["weather"].enabled = True + cfg.notifications.toggles["weather"].min_severity = "routine" + cfg.notifications.toggles["weather"].severity_channels = { + "routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"], + } + cfg.notifications.toggles["weather"].cooldown_seconds = 0 + cfg.notifications.toggles["seismic"].enabled = True + cfg.notifications.toggles["seismic"].min_severity = "routine" + cfg.notifications.toggles["seismic"].severity_channels = { + "routine": ["mesh_broadcast"], "priority": ["mesh_broadcast"], + } + cfg.notifications.toggles["seismic"].cooldown_seconds = 0 + + e = make_event( + source="usgs", category="stream_flood_warning", severity="priority", + title="Snake River nr Twin Falls 12.8 ft", timestamp=time.time(), + ) + _, rec = _dispatch_one(cfg, e) + names = {r["name"] for r in rec} + assert "toggle:seismic" in names, "hydro must route to seismic family" + assert "toggle:weather" not in names, "hydro must NOT route to weather" + + +def test_hydro_high_water_also_seismic(): + """Same as above for stream_high_water (the lower-severity sibling).""" + cfg = Config() + cfg.notifications.rules = [] + cfg.notifications.toggles["seismic"].enabled = True + cfg.notifications.toggles["seismic"].min_severity = "routine" + cfg.notifications.toggles["seismic"].severity_channels = { + "routine": ["mesh_broadcast"], + } + cfg.notifications.toggles["seismic"].cooldown_seconds = 0 + e = make_event( + source="usgs", category="stream_high_water", severity="routine", + title="Snake River 9.8 ft", timestamp=time.time(), + ) + _, rec = _dispatch_one(cfg, e) + assert len(rec) == 1 and rec[0]["name"] == "toggle:seismic" + + +# ============================================================== misc + +def test_dispatch_stats_exposes_all_counters(): + """Stats dict shape is part of the v0.5.2 contract for /api/health.""" + cfg = _cfg() + d, _ = _make_dispatcher(cfg) + stats = d.dispatch_stats() + assert set(stats.keys()) == { + "stale_dropped", "cooldown_dropped", "dedup_dropped", + "cooldown_keys", "dedup_lru_size", + }