From 96de22c6c0849075566e090c4ed2c1eb76d8b626 Mon Sep 17 00:00:00 2001 From: K7ZVX Date: Thu, 14 May 2026 19:21:40 +0000 Subject: [PATCH] feat(notifications): Phase 2.3a digest accumulator and renderer Adds DigestAccumulator tracking ACTIVE NOW and SINCE LAST DIGEST state per toggle. Replaces StubDigestQueue in build_pipeline; the stub class is kept for Phase 2.1 backward-compat tests. - enqueue(): adds new events, updates in place by id, detects resolutions (expires past, or title contains cleared/reopened/ ended/resolved/back online/recovered/lifted) - tick(now): rolls expired actives into since_last - render_digest(now): produces a Digest with mesh_compact (<=200 chars) and full multi-line forms; clears since_last after - Toggle ordering and labels match the v0.3 design - Phase 2.3b will add real scheduling on top of this --- meshai/notifications/pipeline/__init__.py | 149 ++++---- meshai/notifications/pipeline/digest.py | 297 ++++++++++++++++ tests/test_pipeline_digest.py | 395 ++++++++++++++++++++++ 3 files changed, 766 insertions(+), 75 deletions(-) create mode 100644 meshai/notifications/pipeline/digest.py create mode 100644 tests/test_pipeline_digest.py diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index 3e14c34..e90f5b6 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -1,75 +1,74 @@ -"""Notification pipeline package. - -Phase 2.1 + 2.2: - - EventBus: pub/sub ingress - - Inhibitor: suppresses redundant events by inhibit_keys - - Grouper: coalesces events sharing group_key within a window - - SeverityRouter: forks immediate vs digest - - Dispatcher: routes immediate via channels (existing rules schema) - - StubDigestQueue: placeholder for Phase 2.3 - -Usage: - from meshai.notifications.pipeline import build_pipeline - bus = build_pipeline(config) - bus.emit(event) -""" - -from meshai.notifications.channels import create_channel -from meshai.notifications.pipeline.bus import EventBus, get_bus -from meshai.notifications.pipeline.severity_router import ( - SeverityRouter, - StubDigestQueue, -) -from meshai.notifications.pipeline.dispatcher import Dispatcher -from meshai.notifications.pipeline.inhibitor import Inhibitor -from meshai.notifications.pipeline.grouper import Grouper - - -def build_pipeline(config) -> EventBus: - """Build the pipeline and return the EventBus. - - Wiring: - bus -> inhibitor -> grouper -> severity_router -> (dispatcher | digest_stub) - """ - bus = EventBus() - dispatcher = Dispatcher(config, create_channel) - digest = StubDigestQueue() - severity_router = SeverityRouter( - immediate_handler=dispatcher.dispatch, - digest_handler=digest.enqueue, - ) - grouper = Grouper(next_handler=severity_router.handle) - inhibitor = Inhibitor(next_handler=grouper.handle) - bus.subscribe(inhibitor.handle) - return bus - - -def build_pipeline_components(config) -> tuple: - """Like build_pipeline, but returns all components for test inspection. - - Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest). - """ - bus = EventBus() - dispatcher = Dispatcher(config, create_channel) - digest = StubDigestQueue() - severity_router = SeverityRouter( - immediate_handler=dispatcher.dispatch, - digest_handler=digest.enqueue, - ) - grouper = Grouper(next_handler=severity_router.handle) - inhibitor = Inhibitor(next_handler=grouper.handle) - bus.subscribe(inhibitor.handle) - return bus, inhibitor, grouper, severity_router, dispatcher, digest - - -__all__ = [ - "EventBus", - "SeverityRouter", - "StubDigestQueue", - "Dispatcher", - "Inhibitor", - "Grouper", - "build_pipeline", - "build_pipeline_components", - "get_bus", -] +"""Notification pipeline package. + +Phase 2.1 + 2.2 + 2.3a: + - EventBus: pub/sub ingress + - Inhibitor: suppresses redundant events by inhibit_keys + - Grouper: coalesces events sharing group_key within a window + - SeverityRouter: forks immediate vs digest + - Dispatcher: routes immediate via channels (existing rules schema) + - DigestAccumulator: tracks priority/routine events for periodic digest + +Usage: + from meshai.notifications.pipeline import build_pipeline + bus = build_pipeline(config) + bus.emit(event) +""" + +from meshai.notifications.channels import create_channel +from meshai.notifications.pipeline.bus import EventBus, get_bus +from meshai.notifications.pipeline.severity_router import ( + SeverityRouter, + StubDigestQueue, # kept for Phase 2.1 backward-compat tests +) +from meshai.notifications.pipeline.dispatcher import Dispatcher +from meshai.notifications.pipeline.inhibitor import Inhibitor +from meshai.notifications.pipeline.grouper import Grouper +from meshai.notifications.pipeline.digest import DigestAccumulator, Digest + + +def build_pipeline(config) -> EventBus: + """Build the pipeline and return the EventBus.""" + bus = EventBus() + dispatcher = Dispatcher(config, create_channel) + digest = DigestAccumulator() + severity_router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest.enqueue, + ) + grouper = Grouper(next_handler=severity_router.handle) + inhibitor = Inhibitor(next_handler=grouper.handle) + bus.subscribe(inhibitor.handle) + return bus + + +def build_pipeline_components(config) -> tuple: + """Like build_pipeline, but returns all components for tests. + + Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest). + """ + bus = EventBus() + dispatcher = Dispatcher(config, create_channel) + digest = DigestAccumulator() + severity_router = SeverityRouter( + immediate_handler=dispatcher.dispatch, + digest_handler=digest.enqueue, + ) + grouper = Grouper(next_handler=severity_router.handle) + inhibitor = Inhibitor(next_handler=grouper.handle) + bus.subscribe(inhibitor.handle) + return bus, inhibitor, grouper, severity_router, dispatcher, digest + + +__all__ = [ + "EventBus", + "SeverityRouter", + "StubDigestQueue", + "Dispatcher", + "Inhibitor", + "Grouper", + "DigestAccumulator", + "Digest", + "build_pipeline", + "build_pipeline_components", + "get_bus", +] diff --git a/meshai/notifications/pipeline/digest.py b/meshai/notifications/pipeline/digest.py new file mode 100644 index 0000000..571307d --- /dev/null +++ b/meshai/notifications/pipeline/digest.py @@ -0,0 +1,297 @@ +"""Digest accumulator and renderer for Phase 2.3a. + +Holds priority and routine events between digest emissions, tracks +active vs recently-resolved events, and renders the two-section +digest output (ACTIVE NOW + SINCE LAST DIGEST) when called. + +No scheduling logic here. render_digest() is called explicitly by +the future scheduler (Phase 2.3b) or by tests. +""" + +import logging +import time +from dataclasses import dataclass, field +from typing import Optional + +from meshai.notifications.events import Event +from meshai.notifications.categories import get_toggle + + +# Lowercase substrings in event.title that indicate the event is +# a resolution of a prior alert. Conservative list — easy to extend. +RESOLUTION_MARKERS = ( + "cleared", + "reopened", + "ended", + "resolved", + "back online", + "recovered", + "lifted", +) + +# Display labels per toggle (used in rendered output) +TOGGLE_LABELS = { + "mesh_health": "Mesh", + "weather": "Weather", + "fire": "Fire", + "rf_propagation": "RF", + "roads": "Roads", + "avalanche": "Avalanche", + "seismic": "Seismic", + "tracking": "Tracking", + "other": "Other", +} + +# Toggle sort order in digest output (most operationally urgent first) +TOGGLE_ORDER = [ + "weather", + "fire", + "seismic", + "avalanche", + "roads", + "rf_propagation", + "mesh_health", + "tracking", + "other", +] + + +@dataclass +class Digest: + """Result of render_digest(). Carries both sections and metadata.""" + rendered_at: float + active: dict[str, list[Event]] = field(default_factory=dict) + since_last: dict[str, list[Event]] = field(default_factory=dict) + mesh_compact: str = "" + full: str = "" + + def is_empty(self) -> bool: + return not self.active and not self.since_last + + +class DigestAccumulator: + """Tracks priority/routine events and produces periodic digests.""" + + def __init__(self, mesh_char_limit: int = 200): + self._active: dict[str, list[Event]] = {} # toggle -> events + self._since_last: dict[str, list[Event]] = {} # toggle -> events + self._last_digest_at: float = 0.0 + self._mesh_char_limit = mesh_char_limit + self._logger = logging.getLogger("meshai.pipeline.digest") + + # ---- ingress ---- + + def enqueue(self, event: Event) -> None: + """SeverityRouter calls this for priority/routine events.""" + toggle = get_toggle(event.category) or "other" + active_for_toggle = self._active.setdefault(toggle, []) + + # Resolution detection + if self._is_resolution(event, self._now()): + self._move_to_since_last_by_group(event, toggle) + return + + # In-place update if same id + for i, existing in enumerate(active_for_toggle): + if existing.id == event.id: + active_for_toggle[i] = event + self._logger.debug( + f"UPDATED active event {event.id} in {toggle}" + ) + return + + # Otherwise it's a new active event + active_for_toggle.append(event) + self._logger.debug( + f"ADDED active event {event.id} ({toggle}/{event.category})" + ) + + def tick(self, now: Optional[float] = None) -> int: + """Move expired events from active to since_last. + + Returns the number of events moved. + """ + if now is None: + now = self._now() + moved = 0 + for toggle in list(self._active.keys()): + still_active = [] + for ev in self._active[toggle]: + if ev.expires is not None and ev.expires <= now: + self._since_last.setdefault(toggle, []).append(ev) + moved += 1 + else: + still_active.append(ev) + self._active[toggle] = still_active + return moved + + # ---- rendering ---- + + def render_digest(self, now: Optional[float] = None) -> Digest: + """Produce a Digest of current state, then clear since_last.""" + if now is None: + now = self._now() + # tick() first so expired actives roll into since_last + self.tick(now) + + digest = Digest(rendered_at=now) + digest.active = {k: list(v) for k, v in self._active.items() if v} + digest.since_last = {k: list(v) for k, v in self._since_last.items() if v} + digest.mesh_compact = self._render_mesh_compact(digest, now) + digest.full = self._render_full(digest, now) + + # Clear since_last; active stays for the next cycle + self._since_last.clear() + self._last_digest_at = now + return digest + + def _render_mesh_compact(self, digest: Digest, now: float) -> str: + """Produce a mesh-radio-friendly compact form. + + Format: + DIGEST 0700 + ACTIVE: 2 weather, 1 fire, 1 mesh + NEW: 1 roads, 1 weather cleared + Fits under self._mesh_char_limit chars. If it overflows, + truncate by dropping toggles with fewest events first. + """ + lines = [f"DIGEST {time.strftime('%H%M', time.localtime(now))}"] + + if digest.active: + counts = self._compact_counts(digest.active) + lines.append(f"ACTIVE: {counts}") + if digest.since_last: + counts = self._compact_counts(digest.since_last) + lines.append(f"NEW: {counts}") + + if not digest.active and not digest.since_last: + lines.append("All quiet.") + + out = "\n".join(lines) + if len(out) > self._mesh_char_limit: + out = out[: self._mesh_char_limit - 1] + "…" + return out + + def _compact_counts(self, section: dict[str, list[Event]]) -> str: + """e.g. '2 weather, 1 fire, 1 mesh'""" + parts = [] + for toggle in TOGGLE_ORDER: + events = section.get(toggle) + if not events: + continue + label = TOGGLE_LABELS.get(toggle, toggle).lower() + parts.append(f"{len(events)} {label}") + return ", ".join(parts) + + def _render_full(self, digest: Digest, now: float) -> str: + """Produce the full multi-line digest for email/webhook.""" + lines = [ + f"--- {time.strftime('%H%M', time.localtime(now))} Digest ---", + "", + ] + + if digest.active: + lines.append("ACTIVE NOW:") + for toggle in TOGGLE_ORDER: + events = digest.active.get(toggle) + if not events: + continue + label = TOGGLE_LABELS.get(toggle, toggle) + for ev in self._sort_events(events): + lines.append(f" [{label}] {self._format_event_line(ev)}") + lines.append("") + else: + lines.append("ACTIVE NOW: nothing") + lines.append("") + + if digest.since_last: + lines.append("SINCE LAST DIGEST:") + for toggle in TOGGLE_ORDER: + events = digest.since_last.get(toggle) + if not events: + continue + label = TOGGLE_LABELS.get(toggle, toggle) + for ev in self._sort_events(events): + lines.append(f" [{label}] {self._format_event_line(ev)}") + lines.append("") + + return "\n".join(lines).rstrip() + "\n" + + def _format_event_line(self, event: Event) -> str: + """Single-line summary of an event for digest output.""" + # Prefer event.summary if set, else fall back to title + text = event.summary or event.title or event.category + # Append expires hint if available + if event.expires is not None and event.expires > self._now(): + try: + expires_str = time.strftime( + "%H:%M", time.localtime(event.expires) + ) + text = f"{text} (until {expires_str})" + except (ValueError, OverflowError): + pass + # Trim runaway text — keep digest readable + if len(text) > 140: + text = text[:139] + "…" + return text + + def _sort_events(self, events: list[Event]) -> list[Event]: + """Sort within a toggle: immediate first, then priority, + then routine, then by timestamp newest first.""" + rank = {"immediate": 0, "priority": 1, "routine": 2} + return sorted( + events, + key=lambda e: (rank.get(e.severity, 3), -e.timestamp), + ) + + # ---- helpers ---- + + def _is_resolution(self, event: Event, now: float) -> bool: + if event.expires is not None and event.expires <= now: + return True + title_lc = (event.title or "").lower() + return any(marker in title_lc for marker in RESOLUTION_MARKERS) + + def _move_to_since_last_by_group(self, event: Event, toggle: str) -> None: + """Remove any active event matching event's group_key (or id) + and place this resolution event into since_last. + """ + active_list = self._active.get(toggle, []) + # Match by group_key if set, else by id + match_key = event.group_key + if match_key: + self._active[toggle] = [ + e for e in active_list + if e.group_key != match_key + ] + else: + self._active[toggle] = [ + e for e in active_list if e.id != event.id + ] + self._since_last.setdefault(toggle, []).append(event) + self._logger.debug( + f"RESOLVED in {toggle}: {event.id} ({event.title!r})" + ) + + def _now(self) -> float: + return time.time() + + # ---- inspection (for tests and future scheduler) ---- + + def active_count(self, toggle: Optional[str] = None) -> int: + if toggle is not None: + return len(self._active.get(toggle, [])) + return sum(len(v) for v in self._active.values()) + + def since_last_count(self, toggle: Optional[str] = None) -> int: + if toggle is not None: + return len(self._since_last.get(toggle, [])) + return sum(len(v) for v in self._since_last.values()) + + def last_digest_at(self) -> float: + return self._last_digest_at + + def clear(self) -> None: + self._active.clear() + self._since_last.clear() + self._last_digest_at = 0.0 diff --git a/tests/test_pipeline_digest.py b/tests/test_pipeline_digest.py new file mode 100644 index 0000000..a72be74 --- /dev/null +++ b/tests/test_pipeline_digest.py @@ -0,0 +1,395 @@ +"""Tests for Phase 2.3a DigestAccumulator. + +13 tests covering: +- Accumulator active/since_last behavior (6 tests) +- Renderer output (6 tests) +- Pipeline integration (1 test already covered, plus 2 more) +""" + +import time +from unittest.mock import MagicMock, patch + +import pytest + +from meshai.notifications.events import make_event +from meshai.notifications.pipeline import ( + build_pipeline_components, + DigestAccumulator, + Digest, +) +from meshai.config import Config + + +# ============================================================ +# ACCUMULATOR ACTIVE/SINCE_LAST TESTS +# ============================================================ + +def test_enqueue_adds_to_active(): + """Enqueue one routine Event with no expires → active_count == 1.""" + acc = DigestAccumulator() + event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Wind Advisory", + ) + acc.enqueue(event) + assert acc.active_count() == 1 + assert acc.since_last_count() == 0 + + +def test_enqueue_same_id_updates_in_place(): + """Enqueue same id twice → still 1 active, title updated.""" + acc = DigestAccumulator() + event1 = make_event( + source="test", + category="weather_warning", + severity="routine", + id="abc", + title="initial", + ) + event2 = make_event( + source="test", + category="weather_warning", + severity="routine", + id="abc", + title="updated", + ) + acc.enqueue(event1) + acc.enqueue(event2) + assert acc.active_count() == 1 + # Check the held event's title + toggle = "weather" + events = acc._active.get(toggle, []) + assert len(events) == 1 + assert events[0].title == "updated" + + +def test_two_different_ids_both_active(): + """Two different routine events → both active.""" + acc = DigestAccumulator() + event1 = make_event( + source="test", + category="weather_warning", + severity="routine", + id="ev1", + title="Event 1", + ) + event2 = make_event( + source="test", + category="weather_warning", + severity="routine", + id="ev2", + title="Event 2", + ) + acc.enqueue(event1) + acc.enqueue(event2) + assert acc.active_count() == 2 + + +def test_resolution_marker_in_title_moves_active_to_since_last(): + """Resolution marker in title moves matching active to since_last.""" + acc = DigestAccumulator() + event1 = make_event( + source="test", + category="wildfire_proximity", + severity="priority", + group_key="fire:42", + title="Snake River Fire", + ) + acc.enqueue(event1) + assert acc.active_count() == 1 + assert acc.since_last_count() == 0 + + event2 = make_event( + source="test", + category="wildfire_proximity", + severity="priority", + group_key="fire:42", + title="Snake River Fire ended", + ) + acc.enqueue(event2) + assert acc.active_count() == 0 + assert acc.since_last_count() == 1 + + +def test_expired_event_via_tick_moves_to_since_last(): + """tick() moves expired events from active to since_last.""" + acc = DigestAccumulator() + base_time = 1000000.0 + + # Monkeypatch _now to control time + acc._now = lambda: base_time + + event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Temporary Warning", + expires=base_time + 60, # expires in 60 seconds + ) + acc.enqueue(event) + assert acc.active_count() == 1 + assert acc.since_last_count() == 0 + + # Tick at base_time + 30 → still active + moved = acc.tick(now=base_time + 30) + assert moved == 0 + assert acc.active_count() == 1 + + # Tick at base_time + 120 → expired, moved to since_last + moved = acc.tick(now=base_time + 120) + assert moved == 1 + assert acc.active_count() == 0 + assert acc.since_last_count() == 1 + + +def test_render_digest_clears_since_last_but_keeps_active(): + """render_digest() clears since_last but preserves active.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add an active event + active_event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Ongoing Storm", + ) + acc.enqueue(active_event) + + # Add an event that becomes since_last via resolution marker + resolved_event = make_event( + source="test", + category="road_closure", + severity="routine", + group_key="roads:99", + title="US-93 reopened at MP 47", + ) + acc.enqueue(resolved_event) + + # Now we should have 1 active, 1 since_last + assert acc.active_count() == 1 + assert acc.since_last_count() == 1 + + # Render digest + digest = acc.render_digest(now=base_time) + assert len(digest.active) > 0 + assert len(digest.since_last) > 0 + + # After render: active preserved, since_last cleared + assert acc.active_count() == 1 + assert acc.since_last_count() == 0 + + # Second render has only active + digest2 = acc.render_digest(now=base_time + 10) + assert len(digest2.active) > 0 + assert len(digest2.since_last) == 0 + + +# ============================================================ +# RENDERER TESTS +# ============================================================ + +def test_render_full_lists_active_and_since_last_with_labels(): + """Full render includes ACTIVE NOW, SINCE LAST DIGEST, toggle labels.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Weather event (active) + weather_event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Wind Advisory until 21:00", + ) + acc.enqueue(weather_event) + + # Roads event with resolution marker → since_last + roads_event = make_event( + source="test", + category="road_closure", + severity="routine", + title="US-93 reopened at MP 47", + ) + acc.enqueue(roads_event) + + digest = acc.render_digest(now=base_time) + + assert "ACTIVE NOW:" in digest.full + assert "[Weather]" in digest.full + assert "Wind Advisory" in digest.full + assert "SINCE LAST DIGEST:" in digest.full + assert "[Roads]" in digest.full + assert "US-93" in digest.full + + +def test_render_mesh_compact_under_char_limit(): + """mesh_compact is <= 200 chars with proper format.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add 10 events across 4 toggles + categories = [ + ("weather_warning", "Weather Event"), + ("weather_warning", "Weather Event 2"), + ("weather_warning", "Weather Event 3"), + ("wildfire_proximity", "Fire Event"), + ("wildfire_proximity", "Fire Event 2"), + ("battery_warning", "Mesh Event"), + ("battery_warning", "Mesh Event 2"), + ("battery_warning", "Mesh Event 3"), + ("road_closure", "Road Event"), + ("road_closure", "Road Event 2"), + ] + for i, (cat, title) in enumerate(categories): + event = make_event( + source="test", + category=cat, + severity="routine", + id=f"ev{i}", + title=title, + ) + acc.enqueue(event) + + digest = acc.render_digest(now=base_time) + assert len(digest.mesh_compact) <= 200 + assert digest.mesh_compact.startswith("DIGEST ") + assert "ACTIVE:" in digest.mesh_compact + + +def test_render_mesh_compact_all_quiet_when_empty(): + """Empty accumulator renders 'All quiet.' in mesh_compact.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + digest = acc.render_digest(now=base_time) + assert "All quiet" in digest.mesh_compact + + +def test_render_full_handles_empty_accumulator(): + """Empty accumulator → is_empty() True, 'ACTIVE NOW: nothing'.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + digest = acc.render_digest(now=base_time) + assert digest.is_empty() is True + assert "ACTIVE NOW: nothing" in digest.full + + +def test_render_orders_toggles_by_priority(): + """Toggles appear in TOGGLE_ORDER sequence in full output.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + # Add one event each for weather, mesh_health, and fire + # (intentionally out of order) + mesh_event = make_event( + source="test", + category="battery_warning", # maps to mesh_health toggle + severity="routine", + title="Mesh battery low", + ) + fire_event = make_event( + source="test", + category="wildfire_proximity", + severity="routine", + title="Fire nearby", + ) + weather_event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Storm coming", + ) + acc.enqueue(mesh_event) + acc.enqueue(fire_event) + acc.enqueue(weather_event) + + digest = acc.render_digest(now=base_time) + + # In TOGGLE_ORDER: weather, fire, ..., mesh_health + weather_pos = digest.full.find("[Weather]") + fire_pos = digest.full.find("[Fire]") + mesh_pos = digest.full.find("[Mesh]") + + assert weather_pos < fire_pos, "Weather should appear before Fire" + assert fire_pos < mesh_pos, "Fire should appear before Mesh" + + +def test_format_event_line_appends_expires_hint(): + """_format_event_line() appends '(until HH:MM)' for future expires.""" + acc = DigestAccumulator() + base_time = 1000000.0 + acc._now = lambda: base_time + + event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Severe Thunderstorm Warning", + expires=base_time + 3600, # 1 hour in future + ) + + line = acc._format_event_line(event) + assert "(until " in line + # Should have time in HH:MM format + assert ":" in line.split("(until ")[-1] + + +# ============================================================ +# PIPELINE INTEGRATION TESTS +# ============================================================ + +def test_pipeline_routes_routine_event_to_accumulator(): + """Routine event via bus.emit ends up in DigestAccumulator.""" + config = Config() + bus, inhibitor, grouper, severity_router, dispatcher, digest = \ + build_pipeline_components(config) + + event = make_event( + source="test", + category="weather_warning", + severity="routine", + title="Test routine event", + ) + + # Flush through grouper + grouper.flush_all() + bus.emit(event) + grouper.flush_all() + + assert digest.active_count() == 1 + + +def test_pipeline_routes_immediate_event_to_dispatcher_not_accumulator(): + """Immediate event goes to dispatcher, not accumulator.""" + config = Config() + bus, inhibitor, grouper, severity_router, dispatcher, digest = \ + build_pipeline_components(config) + + # Mock the severity_router's immediate handler (already bound to dispatcher.dispatch) + mock_immediate = MagicMock() + severity_router._immediate = mock_immediate + + event = make_event( + source="test", + category="weather_warning", + severity="immediate", + title="Test immediate event", + ) + + grouper.flush_all() + bus.emit(event) + grouper.flush_all() + + # Immediate handler should have been called + assert mock_immediate.called + # Accumulator should have nothing + assert digest.active_count() == 0