diff --git a/meshai/notifications/pipeline/__init__.py b/meshai/notifications/pipeline/__init__.py index fcc5314..3e14c34 100644 --- a/meshai/notifications/pipeline/__init__.py +++ b/meshai/notifications/pipeline/__init__.py @@ -1,10 +1,12 @@ """Notification pipeline package. -Phase 2.1 skeleton: - - EventBus: pub/sub for adapter ingress - - SeverityRouter: forks immediate vs digest paths - - Dispatcher: routes immediate events to channels via existing rules - - StubDigestQueue: placeholder for Phase 2.3 aggregator +Phase 2.1 + 2.2: + - EventBus: pub/sub ingress + - Inhibitor: suppresses redundant events by inhibit_keys + - Grouper: coalesces events sharing group_key within a window + - SeverityRouter: forks immediate vs digest + - Dispatcher: routes immediate via channels (existing rules schema) + - StubDigestQueue: placeholder for Phase 2.3 Usage: from meshai.notifications.pipeline import build_pipeline @@ -19,14 +21,15 @@ from meshai.notifications.pipeline.severity_router import ( StubDigestQueue, ) from meshai.notifications.pipeline.dispatcher import Dispatcher +from meshai.notifications.pipeline.inhibitor import Inhibitor +from meshai.notifications.pipeline.grouper import Grouper def build_pipeline(config) -> EventBus: """Build the pipeline and return the EventBus. - Adapters emit events to this bus and they flow through the - severity router to either the dispatcher (immediate) or the - digest stub (priority/routine). + Wiring: + bus -> inhibitor -> grouper -> severity_router -> (dispatcher | digest_stub) """ bus = EventBus() dispatcher = Dispatcher(config, create_channel) @@ -35,14 +38,16 @@ def build_pipeline(config) -> EventBus: immediate_handler=dispatcher.dispatch, digest_handler=digest.enqueue, ) - bus.subscribe(severity_router.handle) + grouper = Grouper(next_handler=severity_router.handle) + inhibitor = Inhibitor(next_handler=grouper.handle) + bus.subscribe(inhibitor.handle) return bus def build_pipeline_components(config) -> tuple: """Like build_pipeline, but returns all components for test inspection. - Returns (bus, dispatcher, digest, severity_router). + Returns (bus, inhibitor, grouper, severity_router, dispatcher, digest). """ bus = EventBus() dispatcher = Dispatcher(config, create_channel) @@ -51,8 +56,10 @@ def build_pipeline_components(config) -> tuple: immediate_handler=dispatcher.dispatch, digest_handler=digest.enqueue, ) - bus.subscribe(severity_router.handle) - return bus, dispatcher, digest, severity_router + grouper = Grouper(next_handler=severity_router.handle) + inhibitor = Inhibitor(next_handler=grouper.handle) + bus.subscribe(inhibitor.handle) + return bus, inhibitor, grouper, severity_router, dispatcher, digest __all__ = [ @@ -60,6 +67,8 @@ __all__ = [ "SeverityRouter", "StubDigestQueue", "Dispatcher", + "Inhibitor", + "Grouper", "build_pipeline", "build_pipeline_components", "get_bus", diff --git a/meshai/notifications/pipeline/grouper.py b/meshai/notifications/pipeline/grouper.py new file mode 100644 index 0000000..231d45c --- /dev/null +++ b/meshai/notifications/pipeline/grouper.py @@ -0,0 +1,96 @@ +"""Event grouper. + +Coalesces events sharing a group_key inside a time window. The most +recent event for a group_key wins; older versions are replaced. +Events without a group_key pass through immediately. + +The grouper holds events in a window. tick() flushes events whose +window has expired. For Phase 2.2, tick() is called explicitly by +tests; Phase 2.3+ will integrate with the digest scheduler for live +operation. +""" + +import logging +import time +from typing import Callable + +from meshai.notifications.events import Event + + +class Grouper: + """Coalesce same-group_key events inside a window.""" + + def __init__( + self, + next_handler: Callable[[Event], None], + window_seconds: float = 60.0, + ): + """Initialize. + + Args: + next_handler: Callable that receives events when they + exit the grouper (either immediately if no group_key, + or after the window expires). + window_seconds: How long to hold a group_key before + emitting downstream (default 60 seconds). + """ + self._next = next_handler + self._window = window_seconds + # {group_key: (event, hold_until_ts)} + self._held: dict[str, tuple[Event, float]] = {} + self._logger = logging.getLogger("meshai.pipeline.grouper") + + def _now(self) -> float: + return time.time() + + def handle(self, event: Event) -> None: + """Process an event. + + Events without group_key pass through immediately. + Events with group_key are held, replacing any prior held + event with the same group_key. The held event is emitted + later via tick(). + """ + if not event.group_key: + self._next(event) + return + + now = self._now() + hold_until = now + self._window + prior = self._held.get(event.group_key) + if prior is not None: + self._logger.info( + f"COALESCED event {event.id} into group {event.group_key!r}, " + f"replacing prior event {prior[0].id}" + ) + self._held[event.group_key] = (event, hold_until) + + def tick(self) -> int: + """Flush events whose window has expired. + + Returns the number of events emitted. + """ + now = self._now() + to_emit = [ + (gk, ev) for gk, (ev, hu) in self._held.items() if hu <= now + ] + for gk, _ in to_emit: + del self._held[gk] + for _, ev in to_emit: + self._next(ev) + return len(to_emit) + + def flush_all(self) -> int: + """Immediately emit every held event, regardless of window. + + Used at shutdown and by tests. Returns count emitted. + """ + events = [ev for ev, _ in self._held.values()] + self._held.clear() + for ev in events: + self._next(ev) + return len(events) + + def held_count(self) -> int: + """For tests: number of events currently held.""" + return len(self._held) diff --git a/meshai/notifications/pipeline/inhibitor.py b/meshai/notifications/pipeline/inhibitor.py new file mode 100644 index 0000000..9908233 --- /dev/null +++ b/meshai/notifications/pipeline/inhibitor.py @@ -0,0 +1,92 @@ +"""Severity-based event inhibitor. + +Suppresses lower-severity events when a higher-severity event for the +same logical incident (matching inhibit_keys) is already active. + +Inhibit keys are operator-defined strings on the Event that identify +the underlying incident. Two events sharing an inhibit_key refer to +the same situation. If a critical event for "battery:BLD-MTN" fired +recently, a subsequent warning event with the same key gets suppressed. +""" + +import logging +import time +from typing import Callable + +from meshai.notifications.events import Event + + +class Inhibitor: + """Suppress lower-severity events when higher-severity is active.""" + + SEVERITY_RANK = {"routine": 0, "priority": 1, "immediate": 2} + + def __init__( + self, + next_handler: Callable[[Event], None], + ttl_seconds: float = 1800.0, + ): + """Initialize. + + Args: + next_handler: Callable that receives non-suppressed events. + ttl_seconds: How long an inhibit_key remains active after + the originating event (default 30 minutes). + """ + self._next = next_handler + self._ttl = ttl_seconds + # {inhibit_key: (rank, expires_at)} + self._active: dict[str, tuple[int, float]] = {} + self._logger = logging.getLogger("meshai.pipeline.inhibitor") + + def _now(self) -> float: + # Hookable for tests + return time.time() + + def _prune_expired(self, now: float) -> None: + expired = [k for k, (_, exp) in self._active.items() if exp <= now] + for k in expired: + del self._active[k] + + def handle(self, event: Event) -> None: + """Process an event: either suppress it or pass it on. + + If any of the event's inhibit_keys is currently active at a + higher-or-equal rank, the event is suppressed. Otherwise, the + event's inhibit_keys are recorded/upgraded, and the event is + passed to the next handler. + """ + now = self._now() + self._prune_expired(now) + + event_rank = self.SEVERITY_RANK.get(event.severity, 0) + + # Check suppression + for key in event.inhibit_keys: + entry = self._active.get(key) + if entry is not None: + active_rank, _ = entry + if active_rank >= event_rank: + self._logger.info( + f"SUPPRESSED event {event.id} ({event.severity}) " + f"by active key {key!r} at rank {active_rank}" + ) + return + + # Record / upgrade entries + new_expires = now + self._ttl + for key in event.inhibit_keys: + existing = self._active.get(key) + if existing is None or existing[0] < event_rank: + self._active[key] = (event_rank, new_expires) + + # Pass through + self._next(event) + + def active_keys(self) -> dict[str, tuple[int, float]]: + """For tests: snapshot of currently-active inhibit keys.""" + return dict(self._active) + + def clear(self) -> None: + """For tests: reset state.""" + self._active.clear() diff --git a/tests/test_pipeline_inhibitor_grouper.py b/tests/test_pipeline_inhibitor_grouper.py new file mode 100644 index 0000000..13ded09 --- /dev/null +++ b/tests/test_pipeline_inhibitor_grouper.py @@ -0,0 +1,194 @@ +"""Tests for Phase 2.2 inhibitor and grouper.""" + +import pytest +from unittest.mock import Mock + +from meshai.notifications.events import Event +from meshai.notifications.pipeline.inhibitor import Inhibitor +from meshai.notifications.pipeline.grouper import Grouper + + +def make_event(id, severity, inhibit_keys=None, group_key=None): + return Event( + id=id, + source="test", + category="test_cat", + severity=severity, + title=f"Event {id}", + inhibit_keys=inhibit_keys or [], + group_key=group_key, + ) + + +# ===================== INHIBITOR TESTS ===================== + +class TestInhibitor: + + def test_event_without_inhibit_keys_passes_through(self): + next_handler = Mock() + inhibitor = Inhibitor(next_handler) + event = make_event("e1", "immediate", inhibit_keys=[]) + inhibitor.handle(event) + next_handler.assert_called_once_with(event) + + def test_lower_severity_after_higher_is_suppressed(self): + next_handler = Mock() + inhibitor = Inhibitor(next_handler) + ev1 = make_event("e1", "immediate", inhibit_keys=["battery:NODE1"]) + ev2 = make_event("e2", "priority", inhibit_keys=["battery:NODE1"]) + inhibitor.handle(ev1) + inhibitor.handle(ev2) + assert next_handler.call_count == 1 + next_handler.assert_called_once_with(ev1) + + def test_equal_severity_is_suppressed(self): + next_handler = Mock() + inhibitor = Inhibitor(next_handler) + ev1 = make_event("e1", "priority", inhibit_keys=["key1"]) + ev2 = make_event("e2", "priority", inhibit_keys=["key1"]) + inhibitor.handle(ev1) + inhibitor.handle(ev2) + assert next_handler.call_count == 1 + + def test_higher_severity_after_lower_passes_and_upgrades(self): + next_handler = Mock() + inhibitor = Inhibitor(next_handler) + ev1 = make_event("e1", "priority", inhibit_keys=["key1"]) + ev2 = make_event("e2", "immediate", inhibit_keys=["key1"]) + inhibitor.handle(ev1) + inhibitor.handle(ev2) + assert next_handler.call_count == 2 + keys = inhibitor.active_keys() + assert keys["key1"][0] == 2 # immediate rank + + def test_inhibit_key_expires_after_ttl(self): + next_handler = Mock() + inhibitor = Inhibitor(next_handler, ttl_seconds=10) + current_time = [0.0] + inhibitor._now = lambda: current_time[0] + + ev1 = make_event("e1", "immediate", inhibit_keys=["key1"]) + current_time[0] = 0.0 + inhibitor.handle(ev1) + + current_time[0] = 15.0 + ev2 = make_event("e2", "routine", inhibit_keys=["key1"]) + inhibitor.handle(ev2) + + assert next_handler.call_count == 2 + + def test_multiple_keys_any_active_suppresses(self): + next_handler = Mock() + inhibitor = Inhibitor(next_handler) + + ev1 = make_event("e1", "immediate", inhibit_keys=["a", "b"]) + inhibitor.handle(ev1) + assert next_handler.call_count == 1 + + ev2 = make_event("e2", "routine", inhibit_keys=["b", "c"]) + inhibitor.handle(ev2) + assert next_handler.call_count == 1 # suppressed by "b" + + ev3 = make_event("e3", "routine", inhibit_keys=["c", "d"]) + inhibitor.handle(ev3) + assert next_handler.call_count == 2 # passes, no active key + + +# ===================== GROUPER TESTS ===================== + +class TestGrouper: + + def test_event_without_group_key_emits_immediately(self): + next_handler = Mock() + grouper = Grouper(next_handler) + event = make_event("e1", "immediate", group_key=None) + grouper.handle(event) + next_handler.assert_called_once_with(event) + + def test_event_with_group_key_is_held_not_emitted(self): + next_handler = Mock() + grouper = Grouper(next_handler) + event = make_event("e1", "immediate", group_key="fire:42") + grouper.handle(event) + next_handler.assert_not_called() + assert grouper.held_count() == 1 + + def test_second_same_group_key_replaces_first(self): + next_handler = Mock() + grouper = Grouper(next_handler) + ev1 = make_event("e1", "immediate", group_key="fire:42") + ev2 = make_event("e2", "immediate", group_key="fire:42") + grouper.handle(ev1) + grouper.handle(ev2) + next_handler.assert_not_called() + assert grouper.held_count() == 1 + grouper.flush_all() + assert next_handler.call_count == 1 + emitted_event = next_handler.call_args[0][0] + assert emitted_event.id == "e2" + + def test_tick_emits_when_window_expired(self): + next_handler = Mock() + grouper = Grouper(next_handler, window_seconds=5) + current_time = [0.0] + grouper._now = lambda: current_time[0] + + current_time[0] = 0.0 + event = make_event("e1", "immediate", group_key="g") + grouper.handle(event) + assert grouper.held_count() == 1 + + current_time[0] = 3.0 + grouper.tick() + next_handler.assert_not_called() + assert grouper.held_count() == 1 + + current_time[0] = 10.0 + grouper.tick() + next_handler.assert_called_once() + assert grouper.held_count() == 0 + + def test_flush_all_emits_everything_immediately(self): + next_handler = Mock() + grouper = Grouper(next_handler) + ev1 = make_event("e1", "immediate", group_key="g1") + ev2 = make_event("e2", "immediate", group_key="g2") + ev3 = make_event("e3", "immediate", group_key="g3") + grouper.handle(ev1) + grouper.handle(ev2) + grouper.handle(ev3) + assert grouper.held_count() == 3 + grouper.flush_all() + assert next_handler.call_count == 3 + assert grouper.held_count() == 0 + + +# ===================== INTEGRATION TEST ===================== + +class TestInhibitorGrouperChain: + + def test_inhibitor_then_grouper_chain(self): + terminal = Mock() + grouper = Grouper(next_handler=terminal) + inhibitor = Inhibitor(next_handler=grouper.handle) + + # Send immediate event with group_key and inhibit_keys + ev1 = make_event("e1", "immediate", group_key="g1", inhibit_keys=["k1"]) + inhibitor.handle(ev1) + # After inhibitor: passed (no prior key) + # After grouper: held (group_key present) + terminal.assert_not_called() + assert grouper.held_count() == 1 + + # Send routine event with same group_key and inhibit_keys + ev2 = make_event("e2", "routine", group_key="g1", inhibit_keys=["k1"]) + inhibitor.handle(ev2) + # After inhibitor: SUPPRESSED (k1 active at higher rank) + terminal.assert_not_called() + assert grouper.held_count() == 1 # still 1, not 2 + + # Flush grouper + grouper.flush_all() + terminal.assert_called_once() + emitted = terminal.call_args[0][0] + assert emitted.id == "e1" # the immediate, not suppressed routine