feat(notifications): Phase 2.3a digest accumulator and renderer

Adds DigestAccumulator tracking ACTIVE NOW and SINCE LAST DIGEST
state per toggle. Replaces StubDigestQueue in build_pipeline; the
stub class is kept for Phase 2.1 backward-compat tests.

- enqueue(): adds new events, updates in place by id, detects
  resolutions (expires past, or title contains cleared/reopened/
  ended/resolved/back online/recovered/lifted)
- tick(now): rolls expired actives into since_last
- render_digest(now): produces a Digest with mesh_compact (<=200
  chars) and full multi-line forms; clears since_last after
- Toggle ordering and labels match the v0.3 design
- Phase 2.3b will add real scheduling on top of this
This commit is contained in:
K7ZVX 2026-05-14 19:21:40 +00:00
commit 96de22c6c0
3 changed files with 766 additions and 75 deletions

View file

@ -1,75 +1,74 @@
"""Notification pipeline package.
Phase 2.1 + 2.2:
- EventBus: pub/sub ingress
- Inhibitor: suppresses redundant events by inhibit_keys
- Grouper: coalesces events sharing group_key within a window
- SeverityRouter: forks immediate vs digest
- Dispatcher: routes immediate via channels (existing rules schema)
- StubDigestQueue: placeholder for Phase 2.3
Usage:
from meshai.notifications.pipeline import build_pipeline
bus = build_pipeline(config)
bus.emit(event)
"""
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
SeverityRouter,
StubDigestQueue,
)
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
def build_pipeline(config) -> EventBus:
"""Build the pipeline and return the EventBus.
Wiring:
bus -> inhibitor -> grouper -> severity_router -> (dispatcher | digest_stub)
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
digest = StubDigestQueue()
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus
def build_pipeline_components(config) -> tuple:
"""Like build_pipeline, but returns all components for test inspection.
Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
digest = StubDigestQueue()
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus, inhibitor, grouper, severity_router, dispatcher, digest
__all__ = [
"EventBus",
"SeverityRouter",
"StubDigestQueue",
"Dispatcher",
"Inhibitor",
"Grouper",
"build_pipeline",
"build_pipeline_components",
"get_bus",
]
"""Notification pipeline package.
Phase 2.1 + 2.2 + 2.3a:
- EventBus: pub/sub ingress
- Inhibitor: suppresses redundant events by inhibit_keys
- Grouper: coalesces events sharing group_key within a window
- SeverityRouter: forks immediate vs digest
- Dispatcher: routes immediate via channels (existing rules schema)
- DigestAccumulator: tracks priority/routine events for periodic digest
Usage:
from meshai.notifications.pipeline import build_pipeline
bus = build_pipeline(config)
bus.emit(event)
"""
from meshai.notifications.channels import create_channel
from meshai.notifications.pipeline.bus import EventBus, get_bus
from meshai.notifications.pipeline.severity_router import (
SeverityRouter,
StubDigestQueue, # kept for Phase 2.1 backward-compat tests
)
from meshai.notifications.pipeline.dispatcher import Dispatcher
from meshai.notifications.pipeline.inhibitor import Inhibitor
from meshai.notifications.pipeline.grouper import Grouper
from meshai.notifications.pipeline.digest import DigestAccumulator, Digest
def build_pipeline(config) -> EventBus:
"""Build the pipeline and return the EventBus."""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
digest = DigestAccumulator()
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus
def build_pipeline_components(config) -> tuple:
"""Like build_pipeline, but returns all components for tests.
Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest).
"""
bus = EventBus()
dispatcher = Dispatcher(config, create_channel)
digest = DigestAccumulator()
severity_router = SeverityRouter(
immediate_handler=dispatcher.dispatch,
digest_handler=digest.enqueue,
)
grouper = Grouper(next_handler=severity_router.handle)
inhibitor = Inhibitor(next_handler=grouper.handle)
bus.subscribe(inhibitor.handle)
return bus, inhibitor, grouper, severity_router, dispatcher, digest
__all__ = [
"EventBus",
"SeverityRouter",
"StubDigestQueue",
"Dispatcher",
"Inhibitor",
"Grouper",
"DigestAccumulator",
"Digest",
"build_pipeline",
"build_pipeline_components",
"get_bus",
]

View file

@ -0,0 +1,297 @@
"""Digest accumulator and renderer for Phase 2.3a.
Holds priority and routine events between digest emissions, tracks
active vs recently-resolved events, and renders the two-section
digest output (ACTIVE NOW + SINCE LAST DIGEST) when called.
No scheduling logic here. render_digest() is called explicitly by
the future scheduler (Phase 2.3b) or by tests.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from meshai.notifications.events import Event
from meshai.notifications.categories import get_toggle
# Lowercase substrings in event.title that indicate the event is
# a resolution of a prior alert. Conservative list — easy to extend.
RESOLUTION_MARKERS = (
"cleared",
"reopened",
"ended",
"resolved",
"back online",
"recovered",
"lifted",
)
# Display labels per toggle (used in rendered output)
TOGGLE_LABELS = {
"mesh_health": "Mesh",
"weather": "Weather",
"fire": "Fire",
"rf_propagation": "RF",
"roads": "Roads",
"avalanche": "Avalanche",
"seismic": "Seismic",
"tracking": "Tracking",
"other": "Other",
}
# Toggle sort order in digest output (most operationally urgent first)
TOGGLE_ORDER = [
"weather",
"fire",
"seismic",
"avalanche",
"roads",
"rf_propagation",
"mesh_health",
"tracking",
"other",
]
@dataclass
class Digest:
"""Result of render_digest(). Carries both sections and metadata."""
rendered_at: float
active: dict[str, list[Event]] = field(default_factory=dict)
since_last: dict[str, list[Event]] = field(default_factory=dict)
mesh_compact: str = ""
full: str = ""
def is_empty(self) -> bool:
return not self.active and not self.since_last
class DigestAccumulator:
"""Tracks priority/routine events and produces periodic digests."""
def __init__(self, mesh_char_limit: int = 200):
self._active: dict[str, list[Event]] = {} # toggle -> events
self._since_last: dict[str, list[Event]] = {} # toggle -> events
self._last_digest_at: float = 0.0
self._mesh_char_limit = mesh_char_limit
self._logger = logging.getLogger("meshai.pipeline.digest")
# ---- ingress ----
def enqueue(self, event: Event) -> None:
"""SeverityRouter calls this for priority/routine events."""
toggle = get_toggle(event.category) or "other"
active_for_toggle = self._active.setdefault(toggle, [])
# Resolution detection
if self._is_resolution(event, self._now()):
self._move_to_since_last_by_group(event, toggle)
return
# In-place update if same id
for i, existing in enumerate(active_for_toggle):
if existing.id == event.id:
active_for_toggle[i] = event
self._logger.debug(
f"UPDATED active event {event.id} in {toggle}"
)
return
# Otherwise it's a new active event
active_for_toggle.append(event)
self._logger.debug(
f"ADDED active event {event.id} ({toggle}/{event.category})"
)
def tick(self, now: Optional[float] = None) -> int:
"""Move expired events from active to since_last.
Returns the number of events moved.
"""
if now is None:
now = self._now()
moved = 0
for toggle in list(self._active.keys()):
still_active = []
for ev in self._active[toggle]:
if ev.expires is not None and ev.expires <= now:
self._since_last.setdefault(toggle, []).append(ev)
moved += 1
else:
still_active.append(ev)
self._active[toggle] = still_active
return moved
# ---- rendering ----
def render_digest(self, now: Optional[float] = None) -> Digest:
"""Produce a Digest of current state, then clear since_last."""
if now is None:
now = self._now()
# tick() first so expired actives roll into since_last
self.tick(now)
digest = Digest(rendered_at=now)
digest.active = {k: list(v) for k, v in self._active.items() if v}
digest.since_last = {k: list(v) for k, v in self._since_last.items() if v}
digest.mesh_compact = self._render_mesh_compact(digest, now)
digest.full = self._render_full(digest, now)
# Clear since_last; active stays for the next cycle
self._since_last.clear()
self._last_digest_at = now
return digest
def _render_mesh_compact(self, digest: Digest, now: float) -> str:
"""Produce a mesh-radio-friendly compact form.
Format:
DIGEST 0700
ACTIVE: 2 weather, 1 fire, 1 mesh
NEW: 1 roads, 1 weather cleared
Fits under self._mesh_char_limit chars. If it overflows,
truncate by dropping toggles with fewest events first.
"""
lines = [f"DIGEST {time.strftime('%H%M', time.localtime(now))}"]
if digest.active:
counts = self._compact_counts(digest.active)
lines.append(f"ACTIVE: {counts}")
if digest.since_last:
counts = self._compact_counts(digest.since_last)
lines.append(f"NEW: {counts}")
if not digest.active and not digest.since_last:
lines.append("All quiet.")
out = "\n".join(lines)
if len(out) > self._mesh_char_limit:
out = out[: self._mesh_char_limit - 1] + ""
return out
def _compact_counts(self, section: dict[str, list[Event]]) -> str:
"""e.g. '2 weather, 1 fire, 1 mesh'"""
parts = []
for toggle in TOGGLE_ORDER:
events = section.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle).lower()
parts.append(f"{len(events)} {label}")
return ", ".join(parts)
def _render_full(self, digest: Digest, now: float) -> str:
"""Produce the full multi-line digest for email/webhook."""
lines = [
f"--- {time.strftime('%H%M', time.localtime(now))} Digest ---",
"",
]
if digest.active:
lines.append("ACTIVE NOW:")
for toggle in TOGGLE_ORDER:
events = digest.active.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
for ev in self._sort_events(events):
lines.append(f" [{label}] {self._format_event_line(ev)}")
lines.append("")
else:
lines.append("ACTIVE NOW: nothing")
lines.append("")
if digest.since_last:
lines.append("SINCE LAST DIGEST:")
for toggle in TOGGLE_ORDER:
events = digest.since_last.get(toggle)
if not events:
continue
label = TOGGLE_LABELS.get(toggle, toggle)
for ev in self._sort_events(events):
lines.append(f" [{label}] {self._format_event_line(ev)}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _format_event_line(self, event: Event) -> str:
"""Single-line summary of an event for digest output."""
# Prefer event.summary if set, else fall back to title
text = event.summary or event.title or event.category
# Append expires hint if available
if event.expires is not None and event.expires > self._now():
try:
expires_str = time.strftime(
"%H:%M", time.localtime(event.expires)
)
text = f"{text} (until {expires_str})"
except (ValueError, OverflowError):
pass
# Trim runaway text — keep digest readable
if len(text) > 140:
text = text[:139] + ""
return text
def _sort_events(self, events: list[Event]) -> list[Event]:
"""Sort within a toggle: immediate first, then priority,
then routine, then by timestamp newest first."""
rank = {"immediate": 0, "priority": 1, "routine": 2}
return sorted(
events,
key=lambda e: (rank.get(e.severity, 3), -e.timestamp),
)
# ---- helpers ----
def _is_resolution(self, event: Event, now: float) -> bool:
if event.expires is not None and event.expires <= now:
return True
title_lc = (event.title or "").lower()
return any(marker in title_lc for marker in RESOLUTION_MARKERS)
def _move_to_since_last_by_group(self, event: Event, toggle: str) -> None:
"""Remove any active event matching event's group_key (or id)
and place this resolution event into since_last.
"""
active_list = self._active.get(toggle, [])
# Match by group_key if set, else by id
match_key = event.group_key
if match_key:
self._active[toggle] = [
e for e in active_list
if e.group_key != match_key
]
else:
self._active[toggle] = [
e for e in active_list if e.id != event.id
]
self._since_last.setdefault(toggle, []).append(event)
self._logger.debug(
f"RESOLVED in {toggle}: {event.id} ({event.title!r})"
)
def _now(self) -> float:
return time.time()
# ---- inspection (for tests and future scheduler) ----
def active_count(self, toggle: Optional[str] = None) -> int:
if toggle is not None:
return len(self._active.get(toggle, []))
return sum(len(v) for v in self._active.values())
def since_last_count(self, toggle: Optional[str] = None) -> int:
if toggle is not None:
return len(self._since_last.get(toggle, []))
return sum(len(v) for v in self._since_last.values())
def last_digest_at(self) -> float:
return self._last_digest_at
def clear(self) -> None:
self._active.clear()
self._since_last.clear()
self._last_digest_at = 0.0

View file

@ -0,0 +1,395 @@
"""Tests for Phase 2.3a DigestAccumulator.
13 tests covering:
- Accumulator active/since_last behavior (6 tests)
- Renderer output (6 tests)
- Pipeline integration (1 test already covered, plus 2 more)
"""
import time
from unittest.mock import MagicMock, patch
import pytest
from meshai.notifications.events import make_event
from meshai.notifications.pipeline import (
build_pipeline_components,
DigestAccumulator,
Digest,
)
from meshai.config import Config
# ============================================================
# ACCUMULATOR ACTIVE/SINCE_LAST TESTS
# ============================================================
def test_enqueue_adds_to_active():
"""Enqueue one routine Event with no expires → active_count == 1."""
acc = DigestAccumulator()
event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Wind Advisory",
)
acc.enqueue(event)
assert acc.active_count() == 1
assert acc.since_last_count() == 0
def test_enqueue_same_id_updates_in_place():
"""Enqueue same id twice → still 1 active, title updated."""
acc = DigestAccumulator()
event1 = make_event(
source="test",
category="weather_warning",
severity="routine",
id="abc",
title="initial",
)
event2 = make_event(
source="test",
category="weather_warning",
severity="routine",
id="abc",
title="updated",
)
acc.enqueue(event1)
acc.enqueue(event2)
assert acc.active_count() == 1
# Check the held event's title
toggle = "weather"
events = acc._active.get(toggle, [])
assert len(events) == 1
assert events[0].title == "updated"
def test_two_different_ids_both_active():
"""Two different routine events → both active."""
acc = DigestAccumulator()
event1 = make_event(
source="test",
category="weather_warning",
severity="routine",
id="ev1",
title="Event 1",
)
event2 = make_event(
source="test",
category="weather_warning",
severity="routine",
id="ev2",
title="Event 2",
)
acc.enqueue(event1)
acc.enqueue(event2)
assert acc.active_count() == 2
def test_resolution_marker_in_title_moves_active_to_since_last():
"""Resolution marker in title moves matching active to since_last."""
acc = DigestAccumulator()
event1 = make_event(
source="test",
category="wildfire_proximity",
severity="priority",
group_key="fire:42",
title="Snake River Fire",
)
acc.enqueue(event1)
assert acc.active_count() == 1
assert acc.since_last_count() == 0
event2 = make_event(
source="test",
category="wildfire_proximity",
severity="priority",
group_key="fire:42",
title="Snake River Fire ended",
)
acc.enqueue(event2)
assert acc.active_count() == 0
assert acc.since_last_count() == 1
def test_expired_event_via_tick_moves_to_since_last():
"""tick() moves expired events from active to since_last."""
acc = DigestAccumulator()
base_time = 1000000.0
# Monkeypatch _now to control time
acc._now = lambda: base_time
event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Temporary Warning",
expires=base_time + 60, # expires in 60 seconds
)
acc.enqueue(event)
assert acc.active_count() == 1
assert acc.since_last_count() == 0
# Tick at base_time + 30 → still active
moved = acc.tick(now=base_time + 30)
assert moved == 0
assert acc.active_count() == 1
# Tick at base_time + 120 → expired, moved to since_last
moved = acc.tick(now=base_time + 120)
assert moved == 1
assert acc.active_count() == 0
assert acc.since_last_count() == 1
def test_render_digest_clears_since_last_but_keeps_active():
"""render_digest() clears since_last but preserves active."""
acc = DigestAccumulator()
base_time = 1000000.0
acc._now = lambda: base_time
# Add an active event
active_event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Ongoing Storm",
)
acc.enqueue(active_event)
# Add an event that becomes since_last via resolution marker
resolved_event = make_event(
source="test",
category="road_closure",
severity="routine",
group_key="roads:99",
title="US-93 reopened at MP 47",
)
acc.enqueue(resolved_event)
# Now we should have 1 active, 1 since_last
assert acc.active_count() == 1
assert acc.since_last_count() == 1
# Render digest
digest = acc.render_digest(now=base_time)
assert len(digest.active) > 0
assert len(digest.since_last) > 0
# After render: active preserved, since_last cleared
assert acc.active_count() == 1
assert acc.since_last_count() == 0
# Second render has only active
digest2 = acc.render_digest(now=base_time + 10)
assert len(digest2.active) > 0
assert len(digest2.since_last) == 0
# ============================================================
# RENDERER TESTS
# ============================================================
def test_render_full_lists_active_and_since_last_with_labels():
"""Full render includes ACTIVE NOW, SINCE LAST DIGEST, toggle labels."""
acc = DigestAccumulator()
base_time = 1000000.0
acc._now = lambda: base_time
# Weather event (active)
weather_event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Wind Advisory until 21:00",
)
acc.enqueue(weather_event)
# Roads event with resolution marker → since_last
roads_event = make_event(
source="test",
category="road_closure",
severity="routine",
title="US-93 reopened at MP 47",
)
acc.enqueue(roads_event)
digest = acc.render_digest(now=base_time)
assert "ACTIVE NOW:" in digest.full
assert "[Weather]" in digest.full
assert "Wind Advisory" in digest.full
assert "SINCE LAST DIGEST:" in digest.full
assert "[Roads]" in digest.full
assert "US-93" in digest.full
def test_render_mesh_compact_under_char_limit():
"""mesh_compact is <= 200 chars with proper format."""
acc = DigestAccumulator()
base_time = 1000000.0
acc._now = lambda: base_time
# Add 10 events across 4 toggles
categories = [
("weather_warning", "Weather Event"),
("weather_warning", "Weather Event 2"),
("weather_warning", "Weather Event 3"),
("wildfire_proximity", "Fire Event"),
("wildfire_proximity", "Fire Event 2"),
("battery_warning", "Mesh Event"),
("battery_warning", "Mesh Event 2"),
("battery_warning", "Mesh Event 3"),
("road_closure", "Road Event"),
("road_closure", "Road Event 2"),
]
for i, (cat, title) in enumerate(categories):
event = make_event(
source="test",
category=cat,
severity="routine",
id=f"ev{i}",
title=title,
)
acc.enqueue(event)
digest = acc.render_digest(now=base_time)
assert len(digest.mesh_compact) <= 200
assert digest.mesh_compact.startswith("DIGEST ")
assert "ACTIVE:" in digest.mesh_compact
def test_render_mesh_compact_all_quiet_when_empty():
"""Empty accumulator renders 'All quiet.' in mesh_compact."""
acc = DigestAccumulator()
base_time = 1000000.0
acc._now = lambda: base_time
digest = acc.render_digest(now=base_time)
assert "All quiet" in digest.mesh_compact
def test_render_full_handles_empty_accumulator():
"""Empty accumulator → is_empty() True, 'ACTIVE NOW: nothing'."""
acc = DigestAccumulator()
base_time = 1000000.0
acc._now = lambda: base_time
digest = acc.render_digest(now=base_time)
assert digest.is_empty() is True
assert "ACTIVE NOW: nothing" in digest.full
def test_render_orders_toggles_by_priority():
"""Toggles appear in TOGGLE_ORDER sequence in full output."""
acc = DigestAccumulator()
base_time = 1000000.0
acc._now = lambda: base_time
# Add one event each for weather, mesh_health, and fire
# (intentionally out of order)
mesh_event = make_event(
source="test",
category="battery_warning", # maps to mesh_health toggle
severity="routine",
title="Mesh battery low",
)
fire_event = make_event(
source="test",
category="wildfire_proximity",
severity="routine",
title="Fire nearby",
)
weather_event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Storm coming",
)
acc.enqueue(mesh_event)
acc.enqueue(fire_event)
acc.enqueue(weather_event)
digest = acc.render_digest(now=base_time)
# In TOGGLE_ORDER: weather, fire, ..., mesh_health
weather_pos = digest.full.find("[Weather]")
fire_pos = digest.full.find("[Fire]")
mesh_pos = digest.full.find("[Mesh]")
assert weather_pos < fire_pos, "Weather should appear before Fire"
assert fire_pos < mesh_pos, "Fire should appear before Mesh"
def test_format_event_line_appends_expires_hint():
"""_format_event_line() appends '(until HH:MM)' for future expires."""
acc = DigestAccumulator()
base_time = 1000000.0
acc._now = lambda: base_time
event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Severe Thunderstorm Warning",
expires=base_time + 3600, # 1 hour in future
)
line = acc._format_event_line(event)
assert "(until " in line
# Should have time in HH:MM format
assert ":" in line.split("(until ")[-1]
# ============================================================
# PIPELINE INTEGRATION TESTS
# ============================================================
def test_pipeline_routes_routine_event_to_accumulator():
"""Routine event via bus.emit ends up in DigestAccumulator."""
config = Config()
bus, inhibitor, grouper, severity_router, dispatcher, digest = \
build_pipeline_components(config)
event = make_event(
source="test",
category="weather_warning",
severity="routine",
title="Test routine event",
)
# Flush through grouper
grouper.flush_all()
bus.emit(event)
grouper.flush_all()
assert digest.active_count() == 1
def test_pipeline_routes_immediate_event_to_dispatcher_not_accumulator():
"""Immediate event goes to dispatcher, not accumulator."""
config = Config()
bus, inhibitor, grouper, severity_router, dispatcher, digest = \
build_pipeline_components(config)
# Mock the severity_router's immediate handler (already bound to dispatcher.dispatch)
mock_immediate = MagicMock()
severity_router._immediate = mock_immediate
event = make_event(
source="test",
category="weather_warning",
severity="immediate",
title="Test immediate event",
)
grouper.flush_all()
bus.emit(event)
grouper.flush_all()
# Immediate handler should have been called
assert mock_immediate.called
# Accumulator should have nothing
assert digest.active_count() == 0